Very slow database operations due to lack of batching

:bug: Bug description:
Commands like db-in are very slow on at least Postgres due to lack of batching inserts in Prodigy and Peewee.

No batching is done for inserts. This means every single insert statement does a full roundtrip to the database. When using a remote database this means that every individual insert has to go over the network independently. Even with low network latency, this adds up very quickly. For instance, inserting 1000 records yields about 2000 insert statements (due to linking). At a very good network latency of 10ms per request, this means 20 seconds of pure network wait time. In reality, both my datasets and network latency are unfortunately significantly larger. This means that in practice I'm often waiting for (tens of) minutes for db-in to complete.

Additionally, Peewee configures postgres in autocommit mode. This too is rather inefficient, as that means each individual statement is effectively wrapped in a transaction. A transaction incurs significant overhead on the db. Here too, using a single transaction instead of one for each statement is more efficient. That said, the network delay is by far the largest contributor to the performance issues I'm encounter.

I suspect this behaviour also occurs for remote MySQL databases, but as I don't have access to a remote MySQL database I cannot be certain.

Here is a rough little script that mimicks db-in's behaviour with respect to database inserts. It showcases both the inefficient behaviour and a more efficient solution. For an example of 1700 records, the inefficient method takes up 92 seconds with a remote RDS database. The efficient solution that batches only takes 4. In other words, a 23x improvement.

import argparse
import datetime
import json
import os
import pathlib
import random
from typing import Any

import psycopg2
import psycopg2.extensions
import psycopg2.extras

# create the dataset
    'INSERT INTO "dataset" ("name", "created", "meta", "session") '
    'VALUES (%s, %s, %s, %s) RETURNING "dataset"."id"'
# create examples
    'INSERT INTO "example" ("input_hash", "task_hash", "content") '
    'VALUES (%s, %s, %s) RETURNING "example"."id"'
    'INSERT INTO "example" ("input_hash", "task_hash", "content") '
    'VALUES %s RETURNING "example"."id"'
# link examples to datasets
    'INSERT INTO "link" ("example_id", "dataset_id")'
    ' VALUES (%s, %s) RETURNING "link"."id"'
    'INSERT INTO "link" ("example_id", "dataset_id") VALUES %s RETURNING "link"."id"'

def create_params_for_example(
    example: dict[str, Any]
) -> tuple[str, str, psycopg2.extensions.Binary]:
    content = psycopg2.extensions.Binary(json.dumps(example).encode("utf-8"))
    input_hash: str = example["_input_hash"]
    task_hash: str = example["_task_hash"]

    return input_hash, task_hash, content

def create_dataset(connection: Any, name: str) -> int:
    created = int(
    # just some random data to test performance.
    meta = psycopg2.extensions.Binary(
        bytes([random.randint(0, 255) for _ in range(1024)])
    session = True
    print("Creating dataset in db")
    with connection.cursor() as cursor:
        cursor.execute(CREATE_DATASET_SQL, (name, created, meta, session))
        row_id = cursor.fetchone()[0]
    return row_id

def create_examples_inefficiently(
    connection: Any, file_path: pathlib.Path
) -> list[int]:
    """Create examples inefficiently

    Basically call execute in a loop.

    This is what prodigy and peewee end up doing
    print("Creating examples inefficiently")
    row_ids = []
    for i, line in enumerate(file_path.read_text().splitlines()):
        example = json.loads(line)
        params = create_params_for_example(example)
        with connection.cursor() as cursor:
            cursor.execute(CREATE_EXAMPLE_SDL, params)
            row_id = cursor.fetchone()[0]

        if i % 100 == 0:
            print(f"Processed {i} records")
    return row_ids

def create_examples_efficiently(connection: Any, file_path: pathlib.Path) -> list[int]:
    print("Creating examples efficiently")
    params_list = []
    for line in file_path.read_text().splitlines():
        example = json.loads(line)
        params = create_params_for_example(example)

    with connection.cursor() as cursor:
        results = psycopg2.extras.execute_values(
            cursor, CREATE_EXAMPLE_SQL_FAST, params_list, page_size=100, fetch=True

    return [item[0] for item in results]

def link_examples_inefficiently(
    connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
    print("Linking examples inefficiently")
    for i, example_id in enumerate(example_ids):
        params = (example_id, dataset_id)
        with connection.cursor() as cursor:
            cursor.execute(CREATE_LINK_SQL, params)
        if i % 100 == 0:
            print(f"Processed {i} records")

def link_examples_efficiently(
    connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
    print("Linking examples efficiently")
    param_list = []
    for example_id in example_ids:
        params = (example_id, dataset_id)

    with connection.cursor() as cursor:
            cursor, CREATE_LINK_SQL_FAST, param_list, page_size=100, fetch=True

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("file", type=pathlib.Path)
    parser.add_argument("--fast", action="store_true")

    args = parser.parse_args()

    print(f"Fast mode is: {}")

    cfg_file = os.environ.get("PRODIGY_CONFIG")
    cfg_content = json.loads(pathlib.Path(cfg_file).read_text())
    pg_cfg = cfg_content["db_settings"]["postgresql"]
    pg_connection = psycopg2.connect(
    if not
        pg_connection.autocommit = True  # this is what peewee does!

    dataset_id = create_dataset(pg_connection, args.dataset)

    if not
        example_ids = create_examples_inefficiently(pg_connection, args.file)
        link_examples_inefficiently(pg_connection, dataset_id, example_ids)
        example_ids = create_examples_efficiently(pg_connection, args.file)
        link_examples_efficiently(pg_connection, dataset_id, example_ids)

:man_walking:t4:Reproduction steps:
How can we recreate the bug?

  1. Set up a remote postgres database. I used an AWS RDS instance. Importantly, the database should not be running on localhost. Connections to localhost don't touch the physical network, and thus have no network latency (or measured in microseconds). You want a connection with some realistic network latency.
  2. Run prodigy db-in

:desktop_computer: Environment variables:
Please provide prodigy stats or Python version/OS/Prodigy version:

Version          1.11.11                       
Location         /Users/sander/Library/Caches/pypoetry/virtualenvs/annotation-UeuNpqgA-py3.9/lib/python3.9/site-packages/prodigy
Prodigy Home     /Users/sander/.prodigy        
Platform         macOS-13.2-arm64-arm-64bit    
Python Version   3.9.16                        
Database Name    PostgreSQL                    
Database Id      postgresql                    
Total Datasets   9                             
Total Sessions   29      

Welcome Sander! This was a known issue and fixed in Prodigy v1.12 + a small bug that mainly dealt with MySQL added in v1.12.7. If you upgrade db-in does batch inserts and should be quite a bit faster.

Oh how silly of me! Updating to latest version ( 1.13.1 ) did the trick for me :-).