Recently, we released task routers, which provides a mechanism to route tasks to annotators.
To use this new mechanism, you can write your own Python function to allocate tasks to users but you can also use the familiar prodigy.json
configuration file to set the annotations_per_task
variable. If you were to configure annotations_per_task: 3
then you would assign each example to 3 consistently random annotators from you annotation pool.
We recently made a video that explains some of the implementation details on how to get this right. In particular, it helps explain why in general we recommend setting the PRODIGY_ALLOWED_SESSIONS
environment variable.
However, you're also able to configure annotations_per_task: 1.5
which would cause a single annotation 50% of the time and two annotators the other 50% of the time. I received some questions about how this worked internally, so I figured I'd write up a longer post on the topic.
Implementation
For the following segment, it might help to first watch the aforementioned Youtube video or the task router documentation.
To start discussing the implementation, let's first consider when annotations_per_task
is an integer. Like annotations_per_task: 2
or annotations_per_task: 3
. What follows is an elaborate implementation.
from prodigy.core import Controller
def custom_task_router(ctrl: Controller, session_id: str, item: Dict) -> List[str]:
# Let's assume we want to make sure that we have 2 annotators.
average = 2
# Then next, we are going to check how many annotations are already annotated.
# We might already have some annotations for a particular hash, so we'll try and
# account for that.
# First, we'll need to figure out if the user wants to use a task hash or an input hash.
hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
item_hash = (
get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
)
# Once we know the hash, we can check the database.
hash_count = ctrl.db.get_hash_count(ctrl.dataset, session_id, hash=item_hash, kind=ctrl.exclude_by)
# Make sure the hash count does not exceed the average we're interested in.
if hash_count >= average:
return []
# If there is already an annotation in the db, we should keep that in mind
average = average - hash_count
# Let's now consider all the known sessions, and start building up our `annot` pool
pool = ctrl.session_ids
h = item_hash
annot = []
# Keep adding whole annotators
while len(annot) < int(average // 1):
if len(pool) == 0:
return annot
idx = h % len(pool)
annot.append(pool.pop(idx))
# Return the list of annotators
return annot
This task router does a fair bit of preparation work, but eventually it gets to the main allocation part which is this while
-loop:
pool = ctrl.session_ids
h = item_hash
annot = []
while len(annot) < int(average // 1):
if len(pool) == 0:
return annot
idx = h % len(pool)
annot.append(pool.pop(idx))
The while loop makes use of a hashing trick to consistently allocate annotators to the annot
selection pool. Because the available pool shrinks, we're able to keep using the modulo %
operator to make selections.
This works pretty well for annotations_per_task
settings where our average
variable is an integer, but what about floats?
An extra bit
To account for floating point settings, like average=1.5
we can just make an addition to our loop.
# Suppose originally our average setting was:
average = 2.5
# If there is already an annotation in the db, we should keep that in mind
average = average - hash_count
pool = ctrl.session_ids
h = item_hash
annot = []
# Keep adding whole annotators
while len(annot) < int(average // 1):
if len(pool) == 0:
return annot
idx = h % len(pool)
annot.append(pool.pop(idx))
# In case of average=1.5 we need to do something probalistic.
if len(annot) < average:
if len(pool) == 0:
return annot
prob_from_hash = h / 1000 % 1
prob_required = average % 1
if prob_from_hash < prob_required:
idx = h % len(pool)
annot.append(pool.pop(idx))
In this extra bit we kind of perform the same hashing trick, but we use it to calculate a sampling probability. When average=2.5
we calculate prob_required=0.5
. Then we calculate the "probability" from the hash value via h / 1000 % 1
. If this matches our prob_required
we can add an extra annotator to the pool. Because our hashing function gives us uniformly distributed numbers, this should average out nicely in the long run.
It's a bunch of hashing tricks apon hashing tricks really. And these tricks are awesome because they allow us to consistently map a task to a set pool of annotators while keeping the memory requirements light.
Final comment
What I've explained here, in essense, is the base implementation that you can use inside of Prodigy. Prodigy uses the route_average_per_task task router when you configure annotations_per_task
. I've only omitted some extra helpers which allow for logging of annotators.
Note that if you intend to use this method of routing that it's better to use the annotations_per_task
setting than to use the this particular Python code as a custom task router. When you use the configuration file directly, Prodigy is able to make some honest assumptions about how you would like to allocate tasks which in turn allows us to use a clever session factory to create sessions and their task queues. For custom task routers, we can't do that. Some details of this are explained in the Youtube video here.