I see this has been asked a lot but I am struggling to get my head around it.
I am currently running some jobs (mark, ner and ner manual) with a finite stream, I want to re-populate the stream with un-annotated tasks every time the stream is empty.
I understand the value of the batch size and why if you refresh the page prodigy doesn't know if that task is still pending or lost unless you check for it on the db.
I used this code to create the stream loop from some API response data
def get_stream_loop(initialData):
db = connect()
while True:
stream = get_stream(initialData)
yielded = False
for eg in stream:
# Only send out task if its hash isn't in the dataset yet
if eg["_task_hash"] not in hashes_in_dataset:
yield eg
yielded = True
if not yielded:
break
In this case when the last batch is delivered the loop breaks and if for that final batch some task are left un-annotated and a refresh happens the stream won't re-populate.
This can also be replicated with a small dataset smaller than the batch size.
How could I re-build the stream and pass it through this loop again until I can confirm that indeed the whole stream is saved in the db? That condition could be
Hi! I haven't tried this yet, but maybe you could just add that to the if not yielded condition? You probably also want to only compare the number of unique hashes and examples, and make sure there's no duplicates in the data – otherwise, you might end up in an infinite loop. (Also, in case others come across this later: Comparing the lengths like this only works if you're not applying any further preprocessing to the stream later on. For example, if you let spaCy segment sentences, the lengths will never match.)
I am slightly confused why it would exit early, though Typically, the risk with this approach is that you can end up with duplicates if the stream if the stream repopulates before you get a chance to save the new annotations to the database. Or if several people access the same instance, or refresh a lot.