Work Stealing not working

Hi all :wave:

I'm trying to figure out, why work stealing does not work in my setup.
I'm using Prodigy 1.16.0 inside a container and have my batch size set to 1. Via the allowed session env I have set my annotators.
But whenever a queue of an annotator is idle, others are not able to get their tasks. Im working with pretty small dataset sizes (around 10-15 examples), so the impact is noticeable. Sometimes an annotator will not get any examples when others are getting 3 or 4.

As far as I understood, these are the important things: A queue is created for each annotator by the task router and then everybody can annotate their assigned examples. Here is the output to that:

CONTROLLER: Using route_average_per_task router with annotations_per_task=2
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator1
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator2
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator3
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator4
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator5
17:17:52: STREAM: Created queue for output_dataset_2025-02-24-17-annotator6

But how come, that even with "allow_work_stealing":true in my prodigy.json there will be no new examples if somebody has finished their queue? Would it in this case actually be better to not use the allowed session env?

Huge thanks in advance for the feedback, just trying to understand, where the error could be

Welcome to the forum @HarisAdrovic :wave:

In the context of annotations_per_task setting the work stealing will apply as long as the expected number of annotation can be fulfilled. In other words, the annotations_per_task router won't route the tasks which would violate the required estimation.

If, on the other hand, you set feed_overlap to false, you could definitely observe one session consuming all the questions.

In any case, I think you should observe some work stealing in your test case. Especially, if you initialize all the sessions and then consume the stream with one session only. It might be, though, that with this small input stream and multiple sessions consuming the examples, the annotations_per_task prevents stealing from happening due to the constraint explained above.
Still, if you had just one session was consuming, you'd definitely see some cases of the stealing.
To recreate such scenario step by step:

Assuming we have 10 input examples and 6 sessions created upfront:

STREAM: Created queue for test-an1.
STREAM: Created queue for test-an2.
STREAM: Created queue for test-an3.
STREAM: Created queue for test-an4.
STREAM: Created queue for test-an5.
STREAM: Created queue for test-an6.
# test-an1 requests questions
ROUTER: Routing item with _input_hash=1886699658 -> ['test-an1', 'test-an5']
Front-End:
'test-an1' -> _input_hash': 1886699658
# test-an2 requests questions
ROUTER: Routing item with _input_hash=1487477437 -> ['test-an2', 'test-an4']
Front-End:
'test-an2' -> _input_hash': 1487477437
# test-an3 requests questions
ROUTER: Routing item with _input_hash=1842734674 -> ['test-an5', 'test-an6']
ROUTER: Routing item with _input_hash=-487516519 -> ['test-an6', 'test-an2']
ROUTER: Routing item with _input_hash=10406310 -> ['test-an1', 'test-an2']
ROUTER: Routing item with _input_hash=1394627333 -> ['test-an6', 'test-an4']
ROUTER: Routing item with _input_hash=1569967905 -> ['test-an4', 'test-an1']
ROUTER: Routing item with _input_hash=606673118 -> ['test-an3', 'test-an5']
Front-End:
'test-an3' -> _input_hash': 606673118
# test-an4 requests questions -> no new routing, just consuming the session queue
Front-End:
'test-an4' -> _input_hash': 1487477437
# test-an5 requests questions -> no new routing, just consuming the session queue
Front-End:
'test-an5' -> _input_hash': 1886699658
# test-an6 requests questions -> no new routing, just consuming the session queue
Front-End:
'test-an6' -> _input_hash': 1842734674
# At this point we leave sessions 2-6 idle and keep consuming the stream with session an1

# test-an1 requests questions -> no new routing, just consuming the session queue
Front-End:
'test-an1' -> _input_hash': 10406310
# test-an1 requests questions -> no new routing, just consuming the session queue
Front-End:
'test-an1' -> _input_hash': 1569967905
# test-an1 requests questions
ROUTER: Routing item with _input_hash=339726228 -> ['test-an1', 'test-an5']
Front-End:
'test-an1' -> _input_hash': 339726228
# test-an1 requests questions -> no valid items from the router, an1 starts to steal
ROUTER: Routing item with _input_hash=-584314991 -> ['test-an2', 'test-an6']
SESSION: test-an1 has stolen item with hash -1298236362 from test-an2 (_input_hash: 1487477437)
Front-End:
'test-an1' -> _input_hash': 1487477437
# test-an1 requests questions
SESSION: test-an1 has stolen item with hash 851092169 from test-an3
Front-End:
'test-an1' -> _input_hash': 606673118
# test-an1 requests questions
SESSION: test-an1 has stolen item with hash 636683182 from test-an6
Front-End:
'test-an1' -> _input_hash': 1842734674

In total, 3 questions get stolen and 3 get "reserved" by annotations_per_task to fulfill the estimate given the total number of sessions.
If you don't set the sessions upfront, the estimate (and the routing) will be based only on the sessions known at this given point and while you might be able to consume more examples with the greedy session in this toy example, the overall average number of annotations per task will likely be less accurate.
If you're not observing any stealing, it might be that with a more sessions consuming and such short input stream more questions get "reserved". Hard to say without doing a similar step by step analysis.
The point is that annotations_per_task tries to approximate the target as best as possible in given conditions. So to answer your question, there's not really en error. annotation_per_task router is an estimate with certain amount of uncertainty (because sessions might be unknown or known but idle) and it works best under the assumption that you have a pool of annotators that is active more or less to the same degree.
To make sure that all questions are annotated under annotations_per_task condition, minimally, all annotators should be instructed to get to the "No more tasks available" screen.
annotation_per_task.