I am having an issue where Prodigy pulls all the data from my data source before starting and not just a small batch. Please let me figure out what's going on.
I have written a custom function that loads data from a SQL Server database and does some custom segmentation
def db_loader(model_id, server_pa=<SERVER>, database_pa=<DB>, driver=<DRIVER>, batch_size=100, db_cursor_size=100, config_file='config.txt', source_id=None):
segmentation_loaded = None
predana_con = loader_utilities.make_sql_connection(server_pa, database_pa, driver, config_file=config_file)
source = loader_utilities.get_source_data(predana_con, model_id, source_id)
for ix, source_row in source.iterrows():
txt_src_con = loader_utilities.make_sql_connection(source_row.source_server,source_row.source_database, driver, config_file=config_file)
# unlabeled_record_metadata = loader_utilities.get_unlabeled_record_metadata(predana_con, model_id, source_row.source_id, BATCH_SIZE)
con = pyodbc.connect(predana_con)
cursor=con.cursor()
cursor.execute(f'EXECUTE dbo.spcGetModelData {model_id}, {source_row.source_id}, {batch_size}')
for row in loader_utilities.get_db_chunks(cursor, db_cursor_size):
# for row in unlabeled_record_metadata:
#log record as pending
loader_utilities.log_record_status(predana_con, model_id, source_row.source_id, row[0], 1)
#load segmentation function (only load if new or changed)
segmentation_scheme = loader_utilities.get_segmentation_scheme(predana_con, row[0])
if segmentation_scheme!=segmentation_loaded:
segmentation_function = loader_utilities.load_segmentation_function(segmentation_scheme)
segmentation_loaded=segmentation_scheme
#get query and run on text server
text_qry = loader_utilities.get_text_query(predana_con, row[0])
text = loader_utilities.get_text_to_label(txt_src_con, text_qry)
#iterate over text segments
for i, txt in enumerate(segmentation_function(text)):
task = {'meta':{'model_id':model_id, 'source_id':source_row.source_id, 'model_data_id':row[0], 'segment_id': i}, 'text': txt}
yield(task)
con.close()
This works as a generator. I get use next() and get one records at a time, etc. However, no matter how I try to use it in Prodigy, it runs through the entire stream of available data. I have tried setting the batch_size in prodigy.json to small numbers (5, 10), but it will alway stream all the available data. So for large batches of data, there is a very long startup time for preparing annotation. I was expecting my code to pull a batch of SQL and for Prodigy to pull from that until it runs out and my process queries another batch.
I have written a custom recipe to use this data that looks like
@recipe(
'ner.correct_custom',
dataset=("Dataset to save annotations to", "positional", None, str),
spacy_model=("Loadable spaCy model with an entity recognizer", "positional", None, str),
label=("Comma-separated label(s) to annotate or text file with one label per line", "option", "l", get_labels),
server=("Data management server ip", "option", "s", str),
database=("Data management database", "option", "db", str),
driver=("ODBC driver name", "option", "d", str),
model_id=("ID of model to label records for", "option", "mdl", int),
sql_batch_size=("Number of records to stage for labeling", "option", "b", int),
db_cursor_size=("Number of records to pull from source at a time", "option", "cur", int),
config_file=("Location of config file", "option", "cnf", str),
)
def correct_custom(
dataset: str,
spacy_model: str,
label: Optional[List[str]] = None,
exclude: Optional[List[str]] = None,
server: str = <SERVER>,
database: str = <DATABASE>,
driver: str = <DRIVER>,
model_id: int = 1,
sql_batch_size: int = 10,
db_cursor_size: int = 5,
config_file: str = 'config.txt',
):
print("Loading SpaCy Model")
nlp = spacy.load(spacy_model, disable=['parser', 'tagger', 'textcat'])
print("Finished Loading SpaCy Model")
loader = db_loader(model_id=model_id, server_pa=server, database_pa=database, driver=driver,
batch_size=sql_batch_size, db_cursor_size=db_cursor_size, config_file=config_file)
stream = get_stream(loader, rehash=True, dedup=False, input_key="text")
stream = add_tokens(nlp, stream)
stream = make_tasks(nlp, stream, label)
return {
"view_id": "ner_manual", # Annotation interface to use
"dataset": dataset, # Name of dataset to save annotations
"stream": stream, # Incoming stream of examples
"exclude": exclude, # List of dataset names to exclude
"config": { # Additional config settings, mostly for app UI
"lang": nlp.lang,
"labels": label, # Selectable label options
},
}
I am calling Prodigy via
> prodigy ner.correct_custom test_data <MODEL> -l <LABEL> -F <RECIPE FILE>
What am I missing? I have been struggling to figure this out for a few days.