Creating a custom review recipe for image annotation

Hi!

I have a few questions and concerns regarding creating custom review recipes.

Short background: our annotators are going to annotate images with bounding boxes, with an interface similar to image.manual. We would like to ensure the quality of the annotations by forwarding the annotated images to a review recipe, where other annotators need to verify if the bounding boxes are drawn correctly. If the reviewers decide that a task is not annotated correctly (e.g. two out of three reviewers agree that the annotation is incorrect), it should be sent back to the original interface to be annotated again. Additionally, we need to make sure that an annotator cannot review a task they annotated themselves.

I hope I managed to explain myself somewhat clearly! Now for my questions:

Q1.
So far I have managed to create a custom review recipe where the review tasks are created from annotated examples from the database. So each annotated task from the image.manual interface is forwarded to the review interface separately, and given options "incorrect" and "correct", or something similar. How can I now route the incorrect examples back to the original recipe (image.manual) to be annotated again?

Q2.
I also managed to create a custom router based on route_average_per_task, where the functionality is the same but the router additionally checks that a review task is not routed to the original annotator by checking that _session_id of the original task does not include the name of any of the annotators the task is routed to (we are planning on using a pre-determined set of session IDs that we will give to our annotators, and this name is extractable from the prodigy-generated _session_id).
Then I realized that I cannot use the annotations_per_task setting at all with a custom router... I understand setting this limitation to ensure that prodigy works correctly, but our use case really depends on the custom router as well as the option to set the number of annotations per task. How could I achieve this?

Thanks a lot!

1 Like

Welcome to the forum @helmiina :waving_hand: !

Thanks for a detailed description of your use case - it's definitely helpful to have this context and it's pretty clear what you're trying to achieve.

Re Q1:
Assuming your review recipe is saving the annotations in a dedicated dataset, your original image.manual recipe should have a custom loader that would pull examples from this dataset in addition to the original source dataset.
You could apply the majority vote logic in the loader. You should also make sure that the _task_hash attribute of the "reviewed" task is different from the original task (this can be done by defining a custom hashing function in the loader that takes into account a relevant attribute of the task e.g you might want save the reviewed examples with a "reviewed" bool set to True under the meta key of the task dictionary.

By the way, you might just use a binary classification for your review where the reviewers would only accept or reject the annotations. Their decisions can be retrieved by looking at the "answer" key in the saved annotation.

Re Q2
Just to give some more context on the restriction: the reason we do not allow the annotations_per_task setting with other custom routing logic is because the resulting behavior is not at all predictable from he point of view of the library, so we can't really guarantee it works. However, since your custom router is replicating the functionality of the original route_average_per_task with this additional constraint, you should be all set. Just remove the annotations_per_task setting from the prodigy.json file because you want to use your custom routing function rather the built-in one. The functionality should be there, though.

Hope that helps! Let us know if you need help with the implementation.
_

1 Like

Thank you so much @magdaaniol, I made great progress today with your tips!

I was worried that the feed_overlap setting would conflict with my task router without using the annotations_per_task option, but you were right that my custom router just :sparkles: works :sparkles: without it.

I think I was able to include the reviewed and rejected tasks back into the original task stream. I've got another question regarding custom streams though:

Since my recipes require that data is continuously polled from the database (or another source, such as a file in an s3 bucket), I've created a custom stream that is an infinite loop that yields the tasks from the source as it finds them and prodigy automatically filters out those that are already annotated. That works well, but when all available tasks are annotated, the UI shows Loading... indefinitely, instead of "No tasks available". I assume this is because the stream never returns an empty iterator, because the database always has data, and the filtering happens afterwards. In cases where new data comes in quite frequently this is completely fine, but it is expected that an annotator will run out of tasks at some point, so it would be great to show something more fitting than Loading.... And then when new, unannotated tasks are found, those would be shown to the annotator again.

Is there anything I could do to achieve that? Comparing the incoming task hashes to existing ones every time sounds quite wasteful, and I think that might mess up the routing when we want multiple annotations per task.

Thanks again for the great help!

Hi @helmiina,

Glad to hear you've been making progress! :flexed_biceps:
As for the infinite loop UI - would just customizing this "Loading..." message help?
As of Prodigy 1.18 this can be done easily via prodigy.json config file - please check the docs here for the details.

Thank you @magdaaniol , I think customizing the loading-message would be sufficient!

However, I ran into another complication with the infinite stream. I realized that if the stream yields fewer than batch_size new, unannotated examples, the request to /get_session_questions times out and causes an error in the UI. It took me a while to get to the bottom of this actually, since 504 error is not handled in the backend at all, so the issue was not present even in the verbose logs, but instead the browser console showed an obscure JSON parse error, since prodigy was trying to parse the error page HTML for Gateway timed out instead of an incoming stream of examples... :slight_smile:

Anyway, is this always the case with a custom infinite loop stream? I've seen quite many people on the support forum using such streams, but haven't encountered anyone with the same issue as me. I'm afraid that with this issue our last tasks will be left unannotated because they don't form a full batch.

EDIT: the /get_session_questions endpoint gives the same error and crashes the UI sometimes even if there are more than batch_size tasks available. There's no errors in the verbose logs, and router is correctly assigning tasks to sessions, looking like this:

12:59:59: ROUTER: Routing item with _task_hash=1144754256 -> ['reviewed_image_outlines-p', 'reviewed_image_outlines-k', 'reviewed_image_outlines-s']
12:59:59: ROUTER: Routing item with _task_hash=1295951870 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-p', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-851110125 -> ['reviewed_image_outlines-p', 'reviewed_image_outlines-s', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-1077817048 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-p', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-1499942548 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-p', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-1216781656 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-p', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-1059353487 -> ['reviewed_image_outlines-p', 'reviewed_image_outlines-s', 'reviewed_image_outlines-k']
12:59:59: ROUTER: Routing item with _task_hash=-935516641 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-k', 'reviewed_image_outlines-p']
12:59:59: ROUTER: Routing item with _task_hash=-189692315 -> ['reviewed_image_outlines-k', 'reviewed_image_outlines-s', 'reviewed_image_outlines-p']
12:59:59: ROUTER: Routing item with _task_hash=-1424152339 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-k', 'reviewed_image_outlines-p']
12:59:59: ROUTER: Routing item with _task_hash=1149975882 -> ['reviewed_image_outlines-p', 'reviewed_image_outlines-k', 'reviewed_image_outlines-s']
12:59:59: ROUTER: Routing item with _task_hash=-345789481 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-k', 'reviewed_image_outlines-p']
12:59:59: ROUTER: Routing item with _task_hash=-1118549940 -> ['reviewed_image_outlines-p', 'reviewed_image_outlines-k', 'reviewed_image_outlines-s']
12:59:59: ROUTER: Routing item with _task_hash=-602475106 -> ['reviewed_image_outlines-s', 'reviewed_image_outlines-p', 'reviewed_image_outlines-k']

and so on. Input is also fine, tasks from the database of the previous task interface.

Input loader looks like this:

def stream_tasks_from_db(input_ds_name: str) -> StreamType:
    """Load annotated tasks from another interface"""
    while True:
        sleep(5)
        db = connect()
        examples = db.get_dataset_examples(input_ds_name)
        for item in examples:
            yield set_hashes(item)

Really not sure what to do, any help or ideas?

Hi @helmiina,

Thanks for all the debugging information. We'll definitely consider handling the 504 on the backend. That's a very useful feedback!
From what you're describing it looks like the get_session_questions endpoint exceeds a configured timeout somewhere in your infrastructure stack - likely a gateway server such as Nginx or load balancer?

I can see that you have a 5-second sleep in a data loading function. Depending on your gateway server setting that could potentially lead to timeout? Why is it needed in the first place? Also, is it necessary to do the DB connection inside the loop? Currently, you are creating a DB instance each time you're requesting new questions - that should not be necessary.

Other likely bottlenecks could be:

  • big batch size and/or heavy payloads - are the images very big?
  • slow database queries - does your custom router implement any extra DB queries?
  • a high number of annotators (in combination with heavy payloads) causing contention issues inside Controller

You could consider adding additional logging for timing the router function to see if it's performing slow for specific items.
You could also detect lock contention on the state lock are in the controller.get_questions function:

log(f"START: Acquiring state_lock")
lock_start = time.time()
with self.state_lock:
    lock_duration = time.time() - lock_start
    log(f"END: Acquired state_lock in {lock_duration:.2f}s")
    
    # ... rest of the code inside the lock ...

You can access the Controller code in core.py file in your Prodigy installation path.

Before anything though, I'd try optimizing the loader by removing the sleep and moving db = connect() outside the loop.

Hi @magdaaniol ,

Thanks for the reply!

I tried optimizing the stream function with no luck. I don't think the timeout is due to slow code, but the whole /get_session_questions hangs indefinitely in some situations. The Gateway timeout is probably built in the OpenShift environment I'm deploying prodigy, but on my laptop the behavior in the same situations is that the call gets stuck indefinitely even though there are tasks routed, and the annotators see an infinite loading screen.

I'll try to explain what I've observed. So I have three annotators annotating the tasks and annotations_per_task is set to 3:

  • The db stream can fetch the tasks just fine
  • If there is only one annotator at a time, the annotating goes fine
  • If there are multiple annotators at the same time, the behavior gets weird:
    • Everyone gets tasks routed as expected, I can see that from the logs (also in my previous message)
    • Everyone can keep annotating just fine until someone runs out of tasks. After that, everyone's get_session_questions call gets stuck, even though they have tasks to annotate and routed to them, and they see an infinite loading screen on my laptop and in "production" the Gateway issue happens. While the call is stuck, the stream loop is called over and over again causing my laptop to overheat and eventually crash basically, that's why I added the sleep(5).
    • The annotators' interfaces are stuck until the source db (annotations from a different prodigy recipe) gets more data (someone presses save in the other interface). Then, everyone gets the next task from their batch as normal. Then everything goes well again until one annotator runs out of tasks. Note that the getting stuck is not due to lack of new tasks, because once the call finally goes through the annotators also see tasks other than the new ones just submitted to the db.
    • I suspected that this would be due to some weird cookie/cache issue because I was using the same browser for all three sessions, but the same happens with different browsers/isolated tabs.

I tried using the default router but the behavior is the same. My custom router is as I described in my first message, just the default router with an added check that the task is not routed to the original annotator.

I know this is super confusing. If it's of any help, I would be happy to have a Zoom call about this and show what's happening, we really need to get this sorted soon to start our annotation.

Hi @helmiina,

You're right in that it's not slow code that's a problem. The problem is that all sessions share the the same Stream class that controls the iterator over the source. This is good for measuring progress in a tractable way but, in your case, it means that when one session gets to the end of its session queue it starts iterating over the source until new examples show up in the database, effectively blocking other sessions to use the iterator.
Normally, the access to the iterator would be blocked until new questions show up or there's a StopIteration exception. Here, however, the underlying generator is infinite so it will only get unblocked when the first conditions holds i.e. there are new examples.

I don't have a workaround for it yet, just wanted to share the cause for now. I'll update the thread as soon as I have something.

Hi @magdaaniol ,

Thank you so much for the update, I'm glad to hear that you managed to identify the issue!

What you described matches the behavior completely, I have managed to replicate this issue in every one of our tasks that uses an infinite stream. With a stream that is looped through only once, there have been no issues.

Looking forward to any workarounds you can come up with!

Hi @helmiina,

I've been trying out different workarounds and the best solution I could come up with is to "trick" the stream that there's a constant flow of examples by issuing a "dummy" task whenever no new tasks are available for annotation.

This definitely eliminates the problem of the iterator getting blocked and is safe with respect to any custom routing you should choose to implement.
The disadvantage of this solution is that the annotator that gets into the dummy stream will have to click through minimally a batch of the dummy examples or more depending on how long the wait for the new data is.

Here's how to implement what I have described:
In the recipe, we would modify the data loader to issue the dummy task:

DB = connect()
SEEN_BEFORE = set()
# Create a regular, finite stream from the current database state
def stream_tasks_from_db(db, input_ds_name: str):
    """Load annotated tasks from another interface"""
    while True:
        examples = db.get_dataset_examples(input_ds_name)
        task_hashes = set(db.get_task_hashes(input_ds_name))
        if task_hashes.difference(SEEN_BEFORE):
           for item in examples:
               set_hashes(item)
               SEEN_BEFORE.add(item.get("_task_hash"))
               yield set_hashes(item)
        else:
            # Yield a special control task
            control_task = {
                "text": "No new tasks available. Keep hitting ignore until new tasks show up.",
                "meta": {"is_control": True, "action": "refresh"},
                "_input_hash": random.randint(1, 100000) # you might want to adjust it to make sure there's always a unique _input_hash to make sure the queue is never empty
                }
                set_hashes(control_task)
                yield control_task
                # Sleep briefly to avoid tight loops
                time.sleep(1)

You can also add before_db callback to remove the dummy examples from the dataset altogether:

def before_db(examples):
    filtered_examples = []
    for eg in examples:
        if eg.get("meta").get("is_control"):
            continue
        else:
            filtered_examples.append(eg)
     return filtered_examples

This callback should be return from the recipe as:

"before_db": before_db

Then, importantly, in the task router you should make sure the dummy task is only routed to the session that is currently asking for questions to avoid filling the queues of other sessions with the dummy tasks unncessarily. So at the beginning of your custom router you could add

 if item.get("meta").get("is_control"):
     return [session_id]

You could maybe make the UX a bit better by using an html view id for the dummy tasks (the view_id atrribute can be overwritten at the task level) to visually differentiate dummy tasks from the other tasks. You could also add a new button in this html view and link it to a custom js action where you could call prodigy.window.answer("ignore") that would be equivalent to clicking through the multiple dummy questions in UI but without this repetitive impression. Please check out the docs on custom html/js here and let me know if you need any more help with this.

Hi @magdaaniol !

Thank you for the workaround, it seems to work well to prevent crashing.

However, it seems like work stealing does not work with this workaround. I assume it's because no one is technically ever running out of tasks, because their batches are populated with the dummy tasks.

This is another deal breaker to us, since we have a pre-defined set of sessions for our annotators, and the tasks are routed to everyone in the beginning and if someone doesn't do their tasks they never get done.

Any ideas how to deal with this?

Thank you again for being super helpful with everything :folded_hands:

Hi @helmiina,

You're right about work stealing not applying because the batch is never empty for anyone.
This is the specific condition that determines the application of the work stealing:

 if steal_work and len(results) == 0 and other_sessions is not None:
            results.extend(
                self.steal_work(results, n, other_sessions, exclude=seen_task_hashes)
            )

results is the Stream iteration result which, in this case, will never be 0. One workaround we could do is to actually filter out the dummy task from the results as we don't need them anymore at this point (we only need them to unblock the iterator).
So, assuming the dummy task structure from my prev answer, if you say:

results = [item for item in results if not item.data["meta"].get("is_control")]

in line 142 of the session.py (which is inside prodigy/components) you should be able to observe the stealing again.
In fact, this makes for a better UX too because the annotator will eventually get to the "No tasks, please refresh to check if there are any new ones" screen. And if there's more examples in the DB they will appear after the refresh. I actually should have thought of it before. I realize it's a tall order to ask you to edit the source code but it is, indeed, an edge case for us. We'll definitely think how to support better dynamic streams in multiuser scenarios.

1 Like

Hi @magdaaniol !

Once again what you suggested fixed my problems, thank you so much. I do realize that our use case is on the complex side. Also super happy that this change got rid of the dummy tasks in the UI!

Just for documentation if someone ends up in this thread looking for similar answers: I edited session.py locally and copy that over in our Dockerfile directly to the prodigy source files in the image.

1 Like