Very slow database operations due to lack of batching

:bug: Bug description:
Commands like db-in are very slow on at least Postgres due to lack of batching inserts in Prodigy and Peewee.

No batching is done for inserts. This means every single insert statement does a full roundtrip to the database. When using a remote database this means that every individual insert has to go over the network independently. Even with low network latency, this adds up very quickly. For instance, inserting 1000 records yields about 2000 insert statements (due to linking). At a very good network latency of 10ms per request, this means 20 seconds of pure network wait time. In reality, both my datasets and network latency are unfortunately significantly larger. This means that in practice I'm often waiting for (tens of) minutes for db-in to complete.

Additionally, Peewee configures postgres in autocommit mode. This too is rather inefficient, as that means each individual statement is effectively wrapped in a transaction. A transaction incurs significant overhead on the db. Here too, using a single transaction instead of one for each statement is more efficient. That said, the network delay is by far the largest contributor to the performance issues I'm encounter.

I suspect this behaviour also occurs for remote MySQL databases, but as I don't have access to a remote MySQL database I cannot be certain.

Here is a rough little script that mimicks db-in's behaviour with respect to database inserts. It showcases both the inefficient behaviour and a more efficient solution. For an example of 1700 records, the inefficient method takes up 92 seconds with a remote RDS database. The efficient solution that batches only takes 4. In other words, a 23x improvement.

import argparse
import datetime
import json
import os
import pathlib
import random
from typing import Any

import psycopg2
import psycopg2.extensions
import psycopg2.extras

# create the dataset
CREATE_DATASET_SQL = (
    'INSERT INTO "dataset" ("name", "created", "meta", "session") '
    'VALUES (%s, %s, %s, %s) RETURNING "dataset"."id"'
)
# create examples
CREATE_EXAMPLE_SDL = (
    'INSERT INTO "example" ("input_hash", "task_hash", "content") '
    'VALUES (%s, %s, %s) RETURNING "example"."id"'
)
CREATE_EXAMPLE_SQL_FAST = (
    'INSERT INTO "example" ("input_hash", "task_hash", "content") '
    'VALUES %s RETURNING "example"."id"'
)
# link examples to datasets
CREATE_LINK_SQL = (
    'INSERT INTO "link" ("example_id", "dataset_id")'
    ' VALUES (%s, %s) RETURNING "link"."id"'
)
CREATE_LINK_SQL_FAST = (
    'INSERT INTO "link" ("example_id", "dataset_id") VALUES %s RETURNING "link"."id"'
)


def create_params_for_example(
    example: dict[str, Any]
) -> tuple[str, str, psycopg2.extensions.Binary]:
    content = psycopg2.extensions.Binary(json.dumps(example).encode("utf-8"))
    input_hash: str = example["_input_hash"]
    task_hash: str = example["_task_hash"]

    return input_hash, task_hash, content


def create_dataset(connection: Any, name: str) -> int:
    created = int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
    # just some random data to test performance.
    meta = psycopg2.extensions.Binary(
        bytes([random.randint(0, 255) for _ in range(1024)])
    )
    session = True
    print("Creating dataset in db")
    with connection.cursor() as cursor:
        cursor.execute(CREATE_DATASET_SQL, (name, created, meta, session))
        row_id = cursor.fetchone()[0]
    return row_id


def create_examples_inefficiently(
    connection: Any, file_path: pathlib.Path
) -> list[int]:
    """Create examples inefficiently

    Basically call execute in a loop.

    This is what prodigy and peewee end up doing
    """
    print("Creating examples inefficiently")
    row_ids = []
    for i, line in enumerate(file_path.read_text().splitlines()):
        example = json.loads(line)
        params = create_params_for_example(example)
        with connection.cursor() as cursor:
            cursor.execute(CREATE_EXAMPLE_SDL, params)
            row_id = cursor.fetchone()[0]
        row_ids.append(row_id)

        if i % 100 == 0:
            print(f"Processed {i} records")
    return row_ids


def create_examples_efficiently(connection: Any, file_path: pathlib.Path) -> list[int]:
    print("Creating examples efficiently")
    params_list = []
    for line in file_path.read_text().splitlines():
        example = json.loads(line)
        params = create_params_for_example(example)
        params_list.append(params)

    with connection.cursor() as cursor:
        results = psycopg2.extras.execute_values(
            cursor, CREATE_EXAMPLE_SQL_FAST, params_list, page_size=100, fetch=True
        )

    return [item[0] for item in results]


def link_examples_inefficiently(
    connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
    print("Linking examples inefficiently")
    for i, example_id in enumerate(example_ids):
        params = (example_id, dataset_id)
        with connection.cursor() as cursor:
            cursor.execute(CREATE_LINK_SQL, params)
        if i % 100 == 0:
            print(f"Processed {i} records")


def link_examples_efficiently(
    connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
    print("Linking examples efficiently")
    param_list = []
    for example_id in example_ids:
        params = (example_id, dataset_id)
        param_list.append(params)

    with connection.cursor() as cursor:
        psycopg2.extras.execute_values(
            cursor, CREATE_LINK_SQL_FAST, param_list, page_size=100, fetch=True
        )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("dataset")
    parser.add_argument("file", type=pathlib.Path)
    parser.add_argument("--fast", action="store_true")

    args = parser.parse_args()

    print(f"Fast mode is: {args.fast}")

    cfg_file = os.environ.get("PRODIGY_CONFIG")
    cfg_content = json.loads(pathlib.Path(cfg_file).read_text())
    pg_cfg = cfg_content["db_settings"]["postgresql"]
    pg_connection = psycopg2.connect(
        user=pg_cfg["user"],
        password=pg_cfg["password"],
        host=pg_cfg["host"],
        port=pg_cfg["port"],
        dbname=pg_cfg["dbname"],
    )
    if not args.fast:
        pg_connection.autocommit = True  # this is what peewee does!

    dataset_id = create_dataset(pg_connection, args.dataset)

    if not args.fast:
        example_ids = create_examples_inefficiently(pg_connection, args.file)
        link_examples_inefficiently(pg_connection, dataset_id, example_ids)
    else:
        example_ids = create_examples_efficiently(pg_connection, args.file)
        link_examples_efficiently(pg_connection, dataset_id, example_ids)
        pg_connection.commit()

:man_walking:t4:Reproduction steps:
How can we recreate the bug?

  1. Set up a remote postgres database. I used an AWS RDS instance. Importantly, the database should not be running on localhost. Connections to localhost don't touch the physical network, and thus have no network latency (or measured in microseconds). You want a connection with some realistic network latency.
  2. Run prodigy db-in

:desktop_computer: Environment variables:
Please provide prodigy stats or Python version/OS/Prodigy version:

Version          1.11.11                       
Location         /Users/sander/Library/Caches/pypoetry/virtualenvs/annotation-UeuNpqgA-py3.9/lib/python3.9/site-packages/prodigy
Prodigy Home     /Users/sander/.prodigy        
Platform         macOS-13.2-arm64-arm-64bit    
Python Version   3.9.16                        
Database Name    PostgreSQL                    
Database Id      postgresql                    
Total Datasets   9                             
Total Sessions   29      

Welcome Sander! This was a known issue and fixed in Prodigy v1.12 + a small bug that mainly dealt with MySQL added in v1.12.7. If you upgrade db-in does batch inserts and should be quite a bit faster.

Oh how silly of me! Updating to latest version ( 1.13.1 ) did the trick for me :-).