Bug description:
Commands like db-in
are very slow on at least Postgres due to lack of batching inserts in Prodigy and Peewee.
No batching is done for inserts. This means every single insert statement does a full roundtrip to the database. When using a remote database this means that every individual insert has to go over the network independently. Even with low network latency, this adds up very quickly. For instance, inserting 1000 records yields about 2000 insert statements (due to linking). At a very good network latency of 10ms per request, this means 20 seconds of pure network wait time. In reality, both my datasets and network latency are unfortunately significantly larger. This means that in practice I'm often waiting for (tens of) minutes for db-in
to complete.
Additionally, Peewee configures postgres in autocommit mode. This too is rather inefficient, as that means each individual statement is effectively wrapped in a transaction. A transaction incurs significant overhead on the db. Here too, using a single transaction instead of one for each statement is more efficient. That said, the network delay is by far the largest contributor to the performance issues I'm encounter.
I suspect this behaviour also occurs for remote MySQL databases, but as I don't have access to a remote MySQL database I cannot be certain.
Here is a rough little script that mimicks db-in
's behaviour with respect to database inserts. It showcases both the inefficient behaviour and a more efficient solution. For an example of 1700 records, the inefficient method takes up 92 seconds with a remote RDS database. The efficient solution that batches only takes 4. In other words, a 23x improvement.
import argparse
import datetime
import json
import os
import pathlib
import random
from typing import Any
import psycopg2
import psycopg2.extensions
import psycopg2.extras
# create the dataset
CREATE_DATASET_SQL = (
'INSERT INTO "dataset" ("name", "created", "meta", "session") '
'VALUES (%s, %s, %s, %s) RETURNING "dataset"."id"'
)
# create examples
CREATE_EXAMPLE_SDL = (
'INSERT INTO "example" ("input_hash", "task_hash", "content") '
'VALUES (%s, %s, %s) RETURNING "example"."id"'
)
CREATE_EXAMPLE_SQL_FAST = (
'INSERT INTO "example" ("input_hash", "task_hash", "content") '
'VALUES %s RETURNING "example"."id"'
)
# link examples to datasets
CREATE_LINK_SQL = (
'INSERT INTO "link" ("example_id", "dataset_id")'
' VALUES (%s, %s) RETURNING "link"."id"'
)
CREATE_LINK_SQL_FAST = (
'INSERT INTO "link" ("example_id", "dataset_id") VALUES %s RETURNING "link"."id"'
)
def create_params_for_example(
example: dict[str, Any]
) -> tuple[str, str, psycopg2.extensions.Binary]:
content = psycopg2.extensions.Binary(json.dumps(example).encode("utf-8"))
input_hash: str = example["_input_hash"]
task_hash: str = example["_task_hash"]
return input_hash, task_hash, content
def create_dataset(connection: Any, name: str) -> int:
created = int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
# just some random data to test performance.
meta = psycopg2.extensions.Binary(
bytes([random.randint(0, 255) for _ in range(1024)])
)
session = True
print("Creating dataset in db")
with connection.cursor() as cursor:
cursor.execute(CREATE_DATASET_SQL, (name, created, meta, session))
row_id = cursor.fetchone()[0]
return row_id
def create_examples_inefficiently(
connection: Any, file_path: pathlib.Path
) -> list[int]:
"""Create examples inefficiently
Basically call execute in a loop.
This is what prodigy and peewee end up doing
"""
print("Creating examples inefficiently")
row_ids = []
for i, line in enumerate(file_path.read_text().splitlines()):
example = json.loads(line)
params = create_params_for_example(example)
with connection.cursor() as cursor:
cursor.execute(CREATE_EXAMPLE_SDL, params)
row_id = cursor.fetchone()[0]
row_ids.append(row_id)
if i % 100 == 0:
print(f"Processed {i} records")
return row_ids
def create_examples_efficiently(connection: Any, file_path: pathlib.Path) -> list[int]:
print("Creating examples efficiently")
params_list = []
for line in file_path.read_text().splitlines():
example = json.loads(line)
params = create_params_for_example(example)
params_list.append(params)
with connection.cursor() as cursor:
results = psycopg2.extras.execute_values(
cursor, CREATE_EXAMPLE_SQL_FAST, params_list, page_size=100, fetch=True
)
return [item[0] for item in results]
def link_examples_inefficiently(
connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
print("Linking examples inefficiently")
for i, example_id in enumerate(example_ids):
params = (example_id, dataset_id)
with connection.cursor() as cursor:
cursor.execute(CREATE_LINK_SQL, params)
if i % 100 == 0:
print(f"Processed {i} records")
def link_examples_efficiently(
connection: Any, dataset_id: int, example_ids: list[int]
) -> None:
print("Linking examples efficiently")
param_list = []
for example_id in example_ids:
params = (example_id, dataset_id)
param_list.append(params)
with connection.cursor() as cursor:
psycopg2.extras.execute_values(
cursor, CREATE_LINK_SQL_FAST, param_list, page_size=100, fetch=True
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("dataset")
parser.add_argument("file", type=pathlib.Path)
parser.add_argument("--fast", action="store_true")
args = parser.parse_args()
print(f"Fast mode is: {args.fast}")
cfg_file = os.environ.get("PRODIGY_CONFIG")
cfg_content = json.loads(pathlib.Path(cfg_file).read_text())
pg_cfg = cfg_content["db_settings"]["postgresql"]
pg_connection = psycopg2.connect(
user=pg_cfg["user"],
password=pg_cfg["password"],
host=pg_cfg["host"],
port=pg_cfg["port"],
dbname=pg_cfg["dbname"],
)
if not args.fast:
pg_connection.autocommit = True # this is what peewee does!
dataset_id = create_dataset(pg_connection, args.dataset)
if not args.fast:
example_ids = create_examples_inefficiently(pg_connection, args.file)
link_examples_inefficiently(pg_connection, dataset_id, example_ids)
else:
example_ids = create_examples_efficiently(pg_connection, args.file)
link_examples_efficiently(pg_connection, dataset_id, example_ids)
pg_connection.commit()
Reproduction steps:
How can we recreate the bug?
- Set up a remote postgres database. I used an AWS RDS instance. Importantly, the database should not be running on localhost. Connections to localhost don't touch the physical network, and thus have no network latency (or measured in microseconds). You want a connection with some realistic network latency.
- Run
prodigy db-in
Environment variables:
Please provide prodigy stats
or Python version/OS/Prodigy version:
Version 1.11.11
Location /Users/sander/Library/Caches/pypoetry/virtualenvs/annotation-UeuNpqgA-py3.9/lib/python3.9/site-packages/prodigy
Prodigy Home /Users/sander/.prodigy
Platform macOS-13.2-arm64-arm-64bit
Python Version 3.9.16
Database Name PostgreSQL
Database Id postgresql
Total Datasets 9
Total Sessions 29