Understanding the limitations of non-independent/dynamic annotation tasks in Prodigy

We have a use case that requires an annotation screen supporting multi-label hierarchical labelling.

e.g.:

t0 -> Example 1, the user selects A and B;

t1 -> Example 1 with label A, the user selects A1;

t2 -> Example 1 with label B, the user selects B1 and B2;

t3 -> Example 2 ...

To achieve this, we need to create new examples right after t0 dynamically, i.e. create two new variations of Example 1.

Prodigy annotation model seems to assume that every annotation task is independent and also that the dataset is completely defined at build time, i.e. no dynamically generated new examples.

In any case, in prodigy.json we have done the following:

{
	  "batch_size": 1,
	  "instant_submit": true
}

The annotation task stream:

def tasks_stream() -> Generator:
    it = (it for it in JSONL('/prodigy/dataset.jsonl'))
    it = ({**i, **make_metadata(state)} for i in it)
    return it

Obviously mutating the stream having a python generator as the underlying structure is not a good idea, so we've decided to use a LIFO queue and accommodate it to respect the Iterator contract.

from queue import LifoQueue


class IterableQueue(LifoQueue):

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item


queue = IterableQueue()

In the update method (which is called after each example due to the configuration in prodigy.json ) we have the following:

def update(examples):
    print(f"\nReceived {len(examples)} annotations!")

    global queue
    global head

    choices = examples[0]['accept']
    choices = [c for c in choices if c in with_subdomains]

    entries = make_entries(head, choices)

    for entry in entries:
        queue.put(entry)

    if not queue.empty():
        head = queue.queue[-1]

def progress(session=0, total=0, loss=0):
    return 0.0

return {
    "dataset": dataset,
    "view_id": view_id,
    "stream": queue,
    "update": update
}

So after each example, we calculate the new necessary examples and load them to the top of the queue.

This works 80 % of the time. For the remaining 20 % what actually happens is:

t0 -> Example 1, the user selects A and B;

t1 -> Example 2, the user selects C and D;

t2 -> Example 1 with label A, the user selects A1;

t3 -> Example 1 with label B, the user selects B1 and B2;

t4 -> Example 2 with label C, the user selects C1;

t5 -> Example 2 with label D, the user selects D2 andD3;

t6 -> Example 3 ...

This happens because the Prodigy backend thread that is consuming the examples sometimes pulls the next example before the new dynamically created ones are pushed to the queue.

Our question is:

Can you think of any quick fix to control when the backend thread consumes the next example?

Hi! That's not really true – in fact, the main reason Prodigy uses generators for streams is to allow examples to be generated at runtime and using changing outside state (e.g. a model's predictions or existing answers) to decide what to send out.

However, Prodigy doesn't assume that a task that goes out has to wait for a specific task to come back before it's sent – that could easily get very ineffective, because the annotator would have to wait in between questions. So the app always tries to ensure the queue doesn't run out if more examples are available. Blocking would also makes things much more difficult once you have multiple annotators working on the same data.

So it's typically better to design your annotation workflows so that you can send out whole batches at once (even if it's small batches), have the annotator work on them at their own pace, and while they submit the answers, create the next questions in the background. Or, alternatively, do hierarchical annotation in multiple steps: annotate the first level, then create the next level in a follow up task. The advantage here is that you can more easily correct mistakes that happen during the first round of annotation, without invalidating all your data, and you'll have a much clearer record of the examples that were produced and what they were based on.

It's tricky, because if the queue runs out, the app will ask for the next batch from the stream – that's kind of fundamental and also something that doesn't really make sense to change. If instant_submit is enabled, the app wil. submit the answer before refreshing the queue, but of course that doesn't mean you can't end up with a race condition here.

I'm not sure how well it would work, but you could try sending out a "dummy example" (e.g. duplicate that gets filtered out) to keep the app busy and in loading state until your stream is ready with the next example, based on the previous one.