Selective work stealing

I’m coming back with a more specialized task routing / work stealing question than my last post.

I’ve got an annotation task that will include examples with different languages and annotators who can annotate 1-2 of these languages. I’m planning on using custom task routing to ensure that annotators are assigned examples in the appropriate languages. However, I was also hoping to use work stealing for the languages where there are enough annotators to support this. Is there any way to selectively allow work stealing where annotators can steal examples in languages they are proficient in?

Hi @laurejt!

A custom router alone cannot solve this. The router only controls initial assignment. Work stealing is a completely separate code path that bypasses the router.
To support language-aware work stealing, the steal_work method itself would need to be extended — for example, by including a filtering step that checks whether a task is "stealable" by the current session.

But that's a change to Prodigy internals, not something achievable with a custom router alone.
The complexity of it also depends how the mapping between annotators IDs and their languages is stored.
The core logic for this is in prodigy/components/session.py file:
The simplest, minimal (and not flexible) solution would be to add a hardcoded mapping to this file, for example:

LANGUAGES = {
      "alice": {"en", "fr"},
      "bob": {"en"},
      "carol": {"fr", "de"},
  }

and then inside steal_work, in the loop (line 193):

for timestamp, item in session.iter_open():
    if item.key not in unstealable:
        lang = item.data.get("lang") # assuming the language attr is somehow stored on the task
            if lang and self.id in LANGUAGES and lang not in LANGUAGES[self.id]:
                continue
            stealable.append((timestamp, session, item))
            unstealable.add(item.key)

Another option would be to pass the router as an optional callback to steal_work and just re-apply it to stealable items:

# get_questions, line 143
  if steal_work and len(results) == 0 and other_sessions is not None:
      results.extend(
          self.steal_work(
              results, n, other_sessions,
              exclude=seen_task_hashes,
              task_router=task_router,
          )
      )

  # steal_work
  def steal_work(
      self,
      batch, n, sessions, *, exclude=set(),
      task_router=None,
  ):
      ...
      for timestamp, item in session.iter_open():
          if item.key not in unstealable:
              if task_router and self.id not in task_router(item.data):
                  continue
              stealable.append((timestamp, session, item))
              unstealable.add(item.key)
      ...

This way you can reuse the language-based filtering logic and the corresponding configuration of it, but you also apply all the other routing logic which may or may not be an issue. The only problem I can think of with this solution is that the effective work-redistribution can take longer/may not be possible if you use annotation_per_task logic. In annotation_per_task routing, the hash-based assignment narrows the pool of eligible stealers beyond what's necessary. For example, if Alice, Carol, and Dave all speak French and annotations_per_task=2, the router might deterministically assign a French task to Carol and Dave. If both Carol and Dave go inactive and Alice runs out of work, she can't steal the task from their open queues — the router doesn't include her for that item. The task stays stuck even though Alice is free and qualified. With a dedicated language-only filter, Alice could steal it immediately.