How does `annotations_per_task : 2.5` work.

Recently, we released task routers, which provides a mechanism to route tasks to annotators.

To use this new mechanism, you can write your own Python function to allocate tasks to users but you can also use the familiar prodigy.json configuration file to set the annotations_per_task variable. If you were to configure annotations_per_task: 3 then you would assign each example to 3 consistently random annotators from you annotation pool.

We recently made a video that explains some of the implementation details on how to get this right. In particular, it helps explain why in general we recommend setting the PRODIGY_ALLOWED_SESSIONS environment variable.

However, you're also able to configure annotations_per_task: 1.5 which would cause a single annotation 50% of the time and two annotators the other 50% of the time. I received some questions about how this worked internally, so I figured I'd write up a longer post on the topic.

Implementation

For the following segment, it might help to first watch the aforementioned Youtube video or the task router documentation.

To start discussing the implementation, let's first consider when annotations_per_task is an integer. Like annotations_per_task: 2 or annotations_per_task: 3. What follows is an elaborate implementation.

from prodigy.core import Controller

def custom_task_router(ctrl: Controller, session_id: str, item: Dict) -> List[str]:
    # Let's assume we want to make sure that we have 2 annotators. 
    average = 2

    # Then next, we are going to check how many annotations are already annotated. 
    # We might already have some annotations for a particular hash, so we'll try and
    # account for that. 

    # First, we'll need to figure out if the user wants to use a task hash or an input hash. 
    hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
    item_hash = (
        get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
    )
    
    # Once we know the hash, we can check the database.
    hash_count = ctrl.db.get_hash_count(ctrl.dataset, session_id, hash=item_hash, kind=ctrl.exclude_by)
    
    # Make sure the hash count does not exceed the average we're interested in. 
    if hash_count >= average:
        return []

    # If there is already an annotation in the db, we should keep that in mind
    average = average - hash_count
    
  # Let's now consider all the known sessions, and start building up our `annot` pool
  pool = ctrl.session_ids
  h = item_hash
  annot = []

  # Keep adding whole annotators
  while len(annot) < int(average // 1):
      if len(pool) == 0:
          return annot
      idx = h % len(pool)
        annot.append(pool.pop(idx))
    
    # Return the list of annotators 
    return annot

This task router does a fair bit of preparation work, but eventually it gets to the main allocation part which is this while-loop:

pool = ctrl.session_ids
h = item_hash
annot = []

while len(annot) < int(average // 1):
    if len(pool) == 0:
        return annot
    idx = h % len(pool)
    annot.append(pool.pop(idx))

The while loop makes use of a hashing trick to consistently allocate annotators to the annot selection pool. Because the available pool shrinks, we're able to keep using the modulo % operator to make selections.

This works pretty well for annotations_per_task settings where our average variable is an integer, but what about floats?

An extra bit

To account for floating point settings, like average=1.5 we can just make an addition to our loop.

# Suppose originally our average setting was: 
average = 2.5

# If there is already an annotation in the db, we should keep that in mind
average = average - hash_count
pool = ctrl.session_ids
h = item_hash
annot = []

# Keep adding whole annotators
while len(annot) < int(average // 1):
    if len(pool) == 0:
        return annot
    idx = h % len(pool)
    annot.append(pool.pop(idx))

# In case of average=1.5 we need to do something probalistic.
if len(annot) < average:
    if len(pool) == 0:
        return annot
    prob_from_hash = h / 1000 % 1
    prob_required = average % 1
    if prob_from_hash < prob_required:
        idx = h % len(pool)
        annot.append(pool.pop(idx))

In this extra bit we kind of perform the same hashing trick, but we use it to calculate a sampling probability. When average=2.5 we calculate prob_required=0.5. Then we calculate the "probability" from the hash value via h / 1000 % 1. If this matches our prob_required we can add an extra annotator to the pool. Because our hashing function gives us uniformly distributed numbers, this should average out nicely in the long run.

It's a bunch of hashing tricks apon hashing tricks really. And these tricks are awesome because they allow us to consistently map a task to a set pool of annotators while keeping the memory requirements light.

Final comment

What I've explained here, in essense, is the base implementation that you can use inside of Prodigy. Prodigy uses the route_average_per_task task router when you configure annotations_per_task. I've only omitted some extra helpers which allow for logging of annotators.

Note that if you intend to use this method of routing that it's better to use the annotations_per_task setting than to use the this particular Python code as a custom task router. When you use the configuration file directly, Prodigy is able to make some honest assumptions about how you would like to allocate tasks which in turn allows us to use a clever session factory to create sessions and their task queues. For custom task routers, we can't do that. Some details of this are explained in the Youtube video here.

1 Like