Problems with task stealing and sessions

Hi Prodigy team,

I'm facing trouble with work stealing with both pre-determined allowed sessions and dynamic session creation. Basically both approaches are likely to leave me with unannotated data and/or tasks being stuck in one recipe for a long time, stopping them from moving along in our pipeline that consists of several recipes (e.g. segmentation tasks and custom review tasks).

With the PRODIGY_ALLOWED_SESSIONS environment variable the issue is that when prodigy is launched, the initial tasks are routed to all sessions and tasks cannot be stolen from a session if the annotator has not opened it in the browser. Therefore, if one annotator does not complete the tasks in this specific recipe, the tasks are never annotated. Is there a way to allow tasks to be stolen before the annotator has opened their session in the browser (I think this corresponds to the tasks being added to session._open_tasks in the source code)? I'm open to hacking something in the source code if it doesn't break anything else.

I also tried working without the pre-determined sessions, which in many ways would be better for us (we could add annotators dynamically without having to restart all >10 prodigy instances every time, etc.). Here the issue is that new tasks appearing in recipes are only routed once to existing (or active?) sessions, so the probability of some tasks never being annotated is quite high and new sessions may never get anything to annotate. Would it be somehow possible to trigger task routing more frequently, so new sessions get tasks as well and not only the ones existing in the beginning? I noticed that even if annotations_per_task is over 1 and there is only one existing session at the beginning of the task, the tasks are only routed to that session and not to new ones to fill the overlap requirement.

I would be thankful for suggestions for either of these approaches, although as I said dynamic sessions would be better for us.

Thanks a lot!

Hi @helmiina,

I understand your concern—this is definitely a challenging use case that pushes against the current design. Prodigy's current approach makes "local" routing decisions based on the current annotator pool and doesn't revisit them. This is to make sure that annotations_per_task can be met as close as possible given the uncertainty with respect to future annotators' pool, but it is also why you're seeing these issues.

To allow for more precision, we introduced PRODIGY_ALLOWED_SESSIONS which makes the total number of annotators known upfront, but it true that it assumes the all these session will eventually connect to the server.
It's not entirely obvious when a session should be considered deprecated.

I agree that the best solution for your use case would be to use the dynamic sessions (i.e. not to use the PRODIGY_ALLOWED_SESSIONS). What you need, I think, is a way to periodically revisit routing globally.
Perhaps you could try implementing a standalone script that periodically checks the output dataset for input_hashes with fewer than expected annotations and re-add these to the input table to be considered by the router again. It should be a bit easier if you are already using infinite, dynamic streams as inputs.
I think that would be simpler than modifying Prodigy's streaming behavior. Let me know if you need help implementing it!

Hi @magdaaniol ,

Thanks for the informative reply.

I could try implementing something that triggers task routing more frequently, but it's not immediately clear to me how I should go about that.

  • By a standalone script, do you mean something like a cron job, or could it be included in the prodigy setup somehow?
  • How would re-adding unannotated tasks to the input work? Do you mean duplicates of them should be added to the input dataset? Does this result in duplicate tasks being generated to the annotators? What if the input stream is not from another dataset, but a jsonl-file (this is the case in our first recipe, the rest read input from the database)?

I'm not sure if this is possible, but I feel like the most optimal option would be to somehow trigger task-routing when an annotator connects to the server/opens their session. That way prodigy would have the freshest pool of available sessions and the annotator would get tasks right away, if any are available.

Any help with this would be much appreciated!

Hi @helmiina!

Just to clarify the standalone script option:

Yes, I meant something like a cron job that would check the status the Prodigy DB and re-add the tasks that need more annotations. Technically, you'd be adding the duplicates to the input, but the router's logic would prevent sending it to the annotators that already annotated this given example. It should only be sending it to the annotators from the pool that have not seen it yet. The objective here is to call the router function again on these under-annotated tasks.
It's true though that if the input is a file on disk, it makes it a bit more difficult.

I'm not sure if this is possible, but I feel like the most optimal option would be to somehow trigger task-routing when an annotator connects to the server/opens their session. That way prodigy would have the freshest pool of available sessions and the annotator would get tasks right away, if any are available.

This is actually what is happening if you don't use PRODIGY_ALLOWED_SESSIONS. A new session will not have a prepopulated queue and whenever it accesses the server it will call the get_questions endpoint which triggers the router.
The thing is that each call to get_questions triggers the router which distributes the tasks to all registered sessions. That means all sessions that ever registered with this server. I understand that it be helpful if router would dispatch tasks only to the active sessions i.e. sessions that have accessed the server within a given time threshold? The session object has the timestamp attribute that denotes the last time the session was accessed so you could use that to make sure the router only sends tasks to active sessions.
Here's the version of the built-in router that only routes to active sessions where active means active in the last 60s (which is probably too short but it's there just for testing):

def route_average_per_task(average: float) -> TaskRouterProtocol:
    """Uses the task/input hash to ensure consistency across annotators."""
    if average < 1:
        raise ValueError(f"Number of annotators must be at least 1. Got {average}")

    def _task_router(ctrl: "Controller", session_id: str, item: Dict) -> List[str]:
        # If task_hash appears enough times already, don't add annotators
        nonlocal average
        hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
        item_hash = (
            get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
        )
        assert ctrl.dataset
        hash_count = ctrl.db.get_hash_count(ctrl.dataset, hash=item_hash, kind=ctrl.exclude_by)  # type: ignore
        if hash_count >= average:
            return []

        # If there is already an annotation in the db, we should keep that in mind
        annots_needed = average - hash_count
        pool = ctrl.session_ids
        # Filter out stale sessions
        stale_threshold = 60
        current_time = timer()
        active_pool = []
        for s_id in pool:
            session = ctrl._get_session(s_id)
            if current_time - session.timestamp <= stale_threshold:
                # Session exists and is active
                active_pool.append(s_id)
            else:
                print(f"ROUTER: session {s_id} is stale and excluded from routing")
        h = item_hash
        annot = []

        # Ensure that annotators that have already annotated don't get to re-appear in the annot list!
        # Since this is a DB query, for speed, only run when we have to.
        if hash_count > 0:
            annot_examples = ctrl.db.get_dataset_examples_by_hash(
                ctrl.dataset, hash=item_hash, kind=ctrl.exclude_by
            )
            allready_annotated = [ex[SESSION_ID_ATTR] for ex in annot_examples]
            active_pool = [u for u in active_pool if u not in allready_annotated]

        # Keep adding whole annotators
        while len(annot) < int(annots_needed // 1):
            if len(active_pool) == 0:
                log_router(hash_attr, item_hash, annot)
                return annot
            idx = h % len(active_pool)
            annot.append(active_pool.pop(idx))

        # In case of average=1.5 we need to do something probalistic.
        if len(annot) < annots_needed:
            if len(active_pool) == 0:
                log_router(hash_attr, item_hash, annot)
                return annot
            prob_from_hash = h / 1000 % 1
            prob_required = annots_needed % 1
            if prob_from_hash < prob_required:
                idx = h % len(pool)
                annot.append(active_pool.pop(idx))
        log_router(hash_attr, item_hash, annot)
        return annot

    return _task_router

So that would make sure the tasks are sent only to the active sessions.
However, the problem of Prodigy not revisiting routing decisions still remains. The new sessions will only start iterating the Stream from the point at which it is when the sessions access. To make sure the tasks which are already annotated are revisited, you'd need a global "rebalancing" script as we discussed before or use PRODIGY_ALLOWED_SESSIONS and make sure the annotators do access the server at least once.

Hi @magdaaniol ,

This does not seem to be happening. When I start a new session in the browser (without pre-determined sessions), a request is made to get_session_questions but there is no sign of the router being triggered. Even though there are tasks to be annotated, they are not routed to new sessions. The only time a router gets called is when the very first session is started.

I also tried re-adding tasks that are not yet annotated ANNOTATIONS_PER_TASK times and doing that does not trigger routing either. The verbose output from get_session_questions correctly shows that the input database has n more examples, but they are not routed to anyone, so no one gets anything to annotate.

Here's the verbose logs (I censored some task data) of the following process:

  • prodigy is launched without pre-determined sessions and ANNOTATIONS_PER_TASK = 3
  • kari starts a session in the browser and all 3 input tasks are routed to him
  • kari completes all 3 tasks
  • seppo starts his session in the browser and no tasks are routed to him
  • I run a script in the background where all 3 tasks are re-added to the input database
  • seppo refreshes his browser but still no tasks are available
✨  Starting the web server at http://localhost:8082 ...
Open the app in your browser and start annotating!0

INFO:     Started server process [1702155]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8082 (Press CTRL+C to quit)
INFO:     127.0.0.1:45396 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:45396 - "GET /bundle.js HTTP/1.1" 200 OK
09:20:59: /.prodigy/prodigy.json
09:20:59: GET: /project
09:20:59: {'dataset': 'review_outlines', 'recipe_name': 'review_outlines', 'batch_size': 10, 'show_flag': True, 'db': 'sqlite', 'host': 'localhost', 'port': 8082, 'instant_submit': False, 'feed_overlap': False, 'view_id': 'classification', 'version': '1.18.0'}
INFO:     127.0.0.1:45396 - "GET /project HTTP/1.1" 200 OK
INFO:     127.0.0.1:45406 - "GET /fonts/lato-regular.woff2 HTTP/1.1" 200 OK
09:20:59: /.prodigy/prodigy.json
09:20:59: POST: /get_session_questions
09:20:59: CONTROLLER: Getting batch of questions for session: None
⚠ The running recipe is configured for multiple annotators using named
sessions with feed_overlap=True, or via a task router setting, but a client is
requesting questions using the default session. For this recipe, open the app
with ?session=name added to the URL or set feed_overlap to False in your
configuration.
INFO:     127.0.0.1:45428 - "POST /get_session_questions HTTP/1.1" 400 Bad Request
INFO:     127.0.0.1:45428 - "GET /?session=kari HTTP/1.1" 200 OK
INFO:     127.0.0.1:45428 - "GET /bundle.js HTTP/1.1" 200 OK
09:21:04: /.prodigy/prodigy.json
INFO:     127.0.0.1:45428 - "GET /project/kari HTTP/1.1" 200 OK
INFO:     127.0.0.1:45428 - "GET /favicon.ico HTTP/1.1" 200 OK
09:21:04: /.prodigy/prodigy.json
09:21:04: POST: /get_session_questions
09:21:04: CONTROLLER: Getting batch of questions for session: review_outlines-kari
09:21:04: STREAM: Created queue for review_outlines-kari.
09:21:04: ROUTER: Routing item with _task_hash=-1984892024 -> ['review_outlines-kari']
09:21:04: ROUTER: Routing item with _task_hash=711207117 -> ['review_outlines-kari']
09:21:04: ROUTER: Routing item with _task_hash=817772586 -> ['review_outlines-kari']
09:21:04: DB: Loading dataset 'outline_images'
09:21:04: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:04: DB: Loading dataset 'outline_images'
09:21:04: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:04: DB: Loading dataset 'outline_images'
09:21:04: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:04: DB: Loading dataset 'outline_images'
09:21:04: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:04: DB: Loading dataset 'outline_images'
09:21:04: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:05: DB: Loading dataset 'outline_images'
09:21:05: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:05: DB: Loading dataset 'outline_images'
09:21:05: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:05: RESPONSE: /get_session_questions (3 examples)
09:21:05: {'tasks': [{task1}, {task2}, {task3}]}
INFO:     127.0.0.1:45430 - "POST /get_session_questions HTTP/1.1" 200 OK
09:21:16: /.prodigy/prodigy.json
09:21:16: POST: /get_session_questions
09:21:16: CONTROLLER: Getting batch of questions for session: review_outlines-kari
09:21:16: DB: Loading dataset 'outline_images'
09:21:16: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: DB: Loading dataset 'outline_images'
09:21:17: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:17: RESPONSE: /get_session_questions (0 examples)
09:21:17: {'tasks': [], 'total': 0, 'progress': None, 'session_id': 'review_outlines-kari'}
INFO:     127.0.0.1:53346 - "POST /get_session_questions HTTP/1.1" 200 OK
09:21:18: /.prodigy/prodigy.json
09:21:18: POST: /get_session_questions
09:21:18: CONTROLLER: Getting batch of questions for session: review_outlines-kari
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:18: DB: Loading dataset 'outline_images'
09:21:18: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:19: DB: Loading dataset 'outline_images'
09:21:19: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:19: DB: Loading dataset 'outline_images'
09:21:19: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:19: DB: Loading dataset 'outline_images'
09:21:19: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:19: DB: Loading dataset 'outline_images'
09:21:19: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:19: RESPONSE: /get_session_questions (0 examples)
09:21:19: {'tasks': [], 'total': 0, 'progress': None, 'session_id': 'review_outlines-kari'}
INFO:     127.0.0.1:53346 - "POST /get_session_questions HTTP/1.1" 200 OK
09:21:20: /.prodigy/prodigy.json
09:21:20: POST: /get_session_questions
09:21:20: CONTROLLER: Getting batch of questions for session: review_outlines-kari
09:21:20: DB: Loading dataset 'outline_images'
09:21:20: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:20: DB: Loading dataset 'outline_images'
09:21:20: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:20: DB: Loading dataset 'outline_images'
09:21:20: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:20: DB: Loading dataset 'outline_images'
09:21:20: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:20: DB: Loading dataset 'outline_images'
09:21:20: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: DB: Loading dataset 'outline_images'
09:21:21: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: DB: Loading dataset 'outline_images'
09:21:21: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: DB: Loading dataset 'outline_images'
09:21:21: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: DB: Loading dataset 'outline_images'
09:21:21: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: DB: Loading dataset 'outline_images'
09:21:21: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:21: RESPONSE: /get_session_questions (0 examples)
09:21:21: {'tasks': [], 'total': 0, 'progress': None, 'session_id': 'review_outlines-kari'}
INFO:     127.0.0.1:53346 - "POST /get_session_questions HTTP/1.1" 200 OK
09:21:22: /.prodigy/prodigy.json
09:21:22: POST: /give_answers (received 3, session ID 'review_outlines-kari')
09:21:22: [{task1}, {task2}, {task3}]
09:21:22: CONTROLLER: Receiving 3 answers
09:21:22: Controller: received answers for session review_outlines-kari: 3
09:21:22: DB: Creating unstructured dataset 'review_outlines-kari'
09:21:22: {'created': datetime.datetime(2025, 5, 7, 9, 20, 57)}
09:21:22: DB: Added 3 examples to 2 datasets
09:21:22: CONTROLLER: Added 3 answers to dataset 'review_outlines' in database SQLite
09:21:22: RESPONSE: /give_answers
09:21:22: {'progress': None}
INFO:     127.0.0.1:53346 - "POST /give_answers HTTP/1.1" 200 OK
INFO:     127.0.0.1:33988 - "GET /?session=seppo HTTP/1.1" 200 OK
INFO:     127.0.0.1:33988 - "GET /bundle.js HTTP/1.1" 200 OK
09:21:28: / .prodigy/prodigy.json
INFO:     127.0.0.1:33988 - "GET /project/seppo HTTP/1.1" 200 OK
INFO:     127.0.0.1:33988 - "GET /favicon.ico HTTP/1.1" 200 OK
09:21:28: /.prodigy/prodigy.json
09:21:28: POST: /get_session_questions
09:21:28: CONTROLLER: Getting batch of questions for session: review_outlines-seppo
09:21:28: STREAM: Created queue for review_outlines-seppo.
09:21:28: DB: Loading dataset 'outline_images'
09:21:28: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:28: DB: Loading dataset 'outline_images'
09:21:28: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: DB: Loading dataset 'outline_images'
09:21:29: DB: Finished Loading dataset 'outline_images' (3 examples)
09:21:29: RESPONSE: /get_session_questions (0 examples)
09:21:29: {'tasks': [], 'total': 3, 'progress': None, 'session_id': 'review_outlines-seppo'}
INFO:     127.0.0.1:33988 - "POST /get_session_questions HTTP/1.1" 200 OK
INFO:     127.0.0.1:41508 - "GET /?session=seppo HTTP/1.1" 200 OK
INFO:     127.0.0.1:41508 - "GET /bundle.js HTTP/1.1" 200 OK
09:26:01: /.prodigy/prodigy.json
INFO:     127.0.0.1:41508 - "GET /project/seppo HTTP/1.1" 200 OK
INFO:     127.0.0.1:49450 - "GET /favicon.ico HTTP/1.1" 200 OK
09:26:01: /.prodigy/prodigy.json
09:26:01: POST: /get_session_questions
09:26:01: CONTROLLER: Getting batch of questions for session: review_outlines-seppo
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:01: DB: Loading dataset 'outline_images'
09:26:01: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:02: DB: Loading dataset 'outline_images'
09:26:02: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:02: DB: Loading dataset 'outline_images'
09:26:02: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:02: DB: Loading dataset 'outline_images'
09:26:02: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:02: DB: Loading dataset 'outline_images'
09:26:02: DB: Finished Loading dataset 'outline_images' (6 examples)
09:26:02: RESPONSE: /get_session_questions (0 examples)
09:26:02: {'tasks': [], 'total': 3, 'progress': None, 'session_id': 'review_outlines-seppo'}
INFO:     127.0.0.1:49450 - "POST /get_session_questions HTTP/1.1" 200 OK

As you can see, the only time the router is triggered is when the first session is started.

Maybe I'm misunderstanding something and trying the wrong things?

Hi @helmiina,

Thanks for all the details!

This does not seem to be happening. When I start a new session in the browser (without pre-determined sessions), a request is made to get_session_questions but there is no sign of the router being triggered. Even though there are tasks to be annotated, they are not routed to new sessions. The only time a router gets called is when the very first session is started.

Are you 100% sure it is not being triggered? Remember we put this "early exit" condition at the beginning of the router to prevent sessions being spammed with the "control" task:

# inside the task router function
 def _task_router(ctrl: "Controller", session_id: str, item: Dict) -> List[str]:
     if item.get("meta").get("is_control"):
         return [session_id]
    (...)

Maybe you can add a logging statement at the very beginning of the router function to double check?

What I think is happening in the flow you described is that, by the time, seppo joins, kari has consumed all the stream and the only tasks available are the "control" tasks that result in the early exit from the router for seppo, and thus, an empty queue. This will be like that until new examples are added.

I also tried re-adding tasks that are not yet annotated ANNOTATIONS_PER_TASK times and doing that does not trigger routing either. The verbose output from get_session_questions correctly shows that the input database has n more examples, but they are not routed to anyone, so no one gets anything to annotate.

I believe that the streaming function keeps track of already streamed task hashes to prevent a session from getting locked in an infinite loop:

 DB = connect()
    SEEN_BEFORE = set()
    # Create a regular, finite stream from the current database state
    def stream_tasks_from_db(db, input_ds_name: str):
        """Load annotated tasks from another interface"""
        while True:
            examples = db.get_dataset_examples(input_ds_name)
            task_hashes = set(db.get_task_hashes(input_ds_name))
            if task_hashes.difference(SEEN_BEFORE):

                for item in examples:
                    set_hashes(item)
                    SEEN_BEFORE.add(item.get("_task_hash"))
                    yield set_hashes(item)
            else:
                # Yield a special control task
                control_task = {
                    "text": "No new tasks available. Keep hitting ignore until new tasks show up.",
                    "meta": {"is_control": True, "action": "refresh"},
                    "_input_hash": random.randint(1, 100000)
                }
                set_hashes(control_task)
                yield control_task
                # Sleep briefly to avoid tight loops
                time.sleep(1)

For this reason, the script that re-adds the new inputs also needs to convolute the task hashes to make sure the newly added tasks do not get filtered out at this point. This why you're observing that the newly added tasks are not routed to anyone.

Finally, since we are convoluting task hashes, you should set "exclude_by": "input" in the config (either .prodigy.json or the dictionary that the recipe returns) to make sure that the same tasks are not routed to the annotators that have already seen them.
While testing it, I also realized that we need to check the contents of the session's open questions for duplicates. Normally, it is not necessary because duplicates are eliminated on the stream loading level. Howver, with the infinite stream and intentional duplicates, there can be a condition that the seppo session might get all 6 questions in the queue because the router only checks against the already annotated questions saved in the DB.
To illustrate:

  1. kari consumes the entire stream of 3
  2. seppo joins but there are no new questions yet
  3. DB is being populated with duplicates
  4. kari hits refresh (get_questions) and routes all 6 questions to seppo. Since we exclude by input kari doesn't seen any because all _input_hashes are already recorded in the DB for their session. However, since seppo did not have any input_hashes saved in the DB, they get all 6 questions that means duplicates.

To prevent that, you could add another filtering step in Session get_questions function (after filtering out the control questions):

# session.py
def get_questions(
    (...)
    open_items = [item for key, (ts, item) in self._open_tasks.items()]
    open_items_input_hashes = {get_input_hash(item.data) for item in open_items}
    (...)
    results = [item for item in results if not item.data["meta"].get("is_control")]
    results = [item for item in results if get_input_hash(item.data) not in open_items_input_hashes]
    (...)

Hi @magdaaniol ,

Thank you for looking into this so thoroughly.

Trying to make dynamic sessions work started to get so complicated that we decided to stick with using PRODIGY_ALLOWED_SESSIONS and making the routed tasks stealable with a script that calls /get_session_questions for each annotator when the interface is launched. That way tasks do not get stuck in anyone's first batch.

Hi @helmiina,

You're very welcome and I can definitely see how the dynamic sessions in this context have become too hacky. I'm glad you found an alternative and thanks for sharing it.
Our conversation here made me rethink some of the current routing system, so we can hopefully make it more flexible in the future. Thank you for sharing your insights!

2 Likes