custom task routing

so we are trying to use the latest prodigy==1.12a3 and we are using it for task routing. we have many annotators so we want to use task router to route specific image to a specific annotator using the stream id and session_id. how can we add task router to the current custom recipe. And we dont know from where to get the sesion_id, controller and item


import logging
from tools.config import LAYOUT_ANALYSIS_BUCKET, layout_analysis_s3_client
import random
from prodigy.core import Controller
import prodigy




s3_client = layout_analysis_s3_client
bucket_name = LAYOUT_ANALYSIS_BUCKET

# log config 
logging.basicConfig(
    filename="/usr/local/prodigy/logs/line_to_text.log",
    format="%(levelname)s: %(message)s",
    level=logging.INFO,
    )

# Prodigy has a logger named "prodigy" according to 
# https://support.prodi.gy/t/how-to-write-log-data-to-file/1427/10
prodigy_logger = logging.getLogger('prodigy')
prodigy_logger.setLevel(logging.INFO)

def custom_task_router(ctrl, session_id, item):
    pass


@prodigy.recipe("line-to-text-recipe")
def line_to_text_recipe(dataset, jsonl_file):
    logging.info(f"dataset:{dataset}, jsonl_file_path:{jsonl_file}")
    blocks = [ 
        {"view_id": "image"},
        {"view_id": "text_input"}
    ]

    return {
        "dataset": dataset,
        "stream": stream_from_jsonl(jsonl_file),
        "view_id": "blocks",
        "task_router": custom_task_router(),
        "config": {
            "blocks": blocks,
            "editable": True
        }
    }


def stream_from_jsonl(jsonl_file):
    with jsonlines.open(jsonl_file) as reader:
        for line in reader:
            image_id = line["id"]
            obj_key = line["image_url"]
            text = line["user_input"]
            image_url = get_new_url(obj_key)
            yield {"id": image_id, "image": image_url, "user_input": text}

def get_new_url(image_url):
    new_image_url = s3_client.generate_presigned_url(
        ClientMethod="get_object",
        Params={"Bucket": bucket_name, "Key": image_url},
        ExpiresIn=31536000
    )
    return new_image_url

Happy to hear you're trying out the new alpha!

Just to confirm, have you seen the new docs on the task router? I'm mention it because this URL is new.

Specifically, the segment where custom task routers are configured? This codeblock (with pseudo code) in particular might be relevant:

import prodigy

def custom_task_router(ctrl, session_id, item):
    ...

@prodigy.recipe(
    "my-custom-recipe",
    dataset=("Dataset to save answers to", "positional", None, str),
    view_id=("Annotation interface", "option", "v", str)
)
def my_custom_recipe(dataset, view_id="text"):
    # Load your own streams from anywhere you want
    stream = load_my_custom_stream()

    return {
        "dataset": dataset,
        "view_id": view_id,
        "stream": stream,
        "task_router": custom_task_router
    }

Notice how custom_task_router is a Python function that we're not calling? We're just passing the function, such that Prodigy can use it later. The task router is a "callback function" that will get the controller, session_id and item at runtime. It is called each time an item needs to be routed. Prodigy needs the function at the start, but won't call it until items actually need to be assigned.

Does this help?

Extra information

If it helps, you can also find more information on what you can do with the Controller on the docs found here:

so this is what I tried after your advice and try to run the prodigy


import logging
from tools.config import LAYOUT_ANALYSIS_BUCKET, layout_analysis_s3_client
import random
import prodigy
import jsonlines
from pathlib import Path




s3_client = layout_analysis_s3_client
bucket_name = LAYOUT_ANALYSIS_BUCKET

#example of item["id"]: "1-1-2b_line_9874_5"
#item["id"]: "1-1-3a_line_9874_0"


# log config 
logging.basicConfig(
    filename="./line_to_text.log",
    format="%(levelname)s: %(message)s",
    level=logging.INFO,
    )

session_dict = {
        "1-1-3a": "line_to_text-tenzin",
        "1-1-2b": "line_to_text-kaldan",
        "1-1-3b": "line_to_text-tatse"
    }

def custom_task_router(ctrl, session_id, item):
    logging.INFO("custom_task_router started")
    curr_session_ids = ctrl.all_session_id
    logging.info("custom_task_router started")
    if item['id'].split("-")[0] in session_dict.keys():
        assigned_session_id = session_dict[item['id'].split("-")[0]]
        if assigned_session_id in curr_session_ids:
            return session_id
    return session_id

@prodigy.recipe("line-to-text-recipe")
def line_to_text_recipe(dataset, jsonl_file):
    logging.info(f"dataset:{dataset}, jsonl_file_path:{jsonl_file}")
    blocks = [ 
            {"view_id": "image"},
            {"view_id": "text_input"}
        ]
    return {
        "dataset": dataset,
        "stream": stream_from_jsonl(jsonl_file),
        "view_id": "blocks",
        "task_router": custom_task_router,
        "config": {
            "blocks": blocks,
            "editable": True
        }
    }


def stream_from_jsonl(jsonl_file):
    with jsonlines.open(jsonl_file) as reader:
        for line in reader:
            image_id = line["id"]
            obj_key = line["image_url"]
            text = line["user_input"]
            image_url = get_new_url(obj_key)
            yield {"id": image_id, "image": image_url}

def get_new_url(image_url):
    new_image_url = s3_client.generate_presigned_url(
        ClientMethod="get_object",
        Params={"Bucket": bucket_name, "Key": image_url},
        ExpiresIn=31536000
    )
    return new_image_url

and I am getting the below error even though I have the latest version


t

Are you 100% sure that you're running the latest Prodigy version? Could you run prodigy stats?

This is what I see when I do that.

============================== ✨  Prodigy Stats ==============================

Version          1.12a3                        
Location         /home/vincent/Development/prodigy-demos/venv/lib/python3.8/site-packages/prodigy
Prodigy Home     /home/vincent/.prodigy        
Platform         Linux-5.11.0-7614-generic-x86_64-with-glibc2.32
Python Version   3.8.6                         
Spacy Version    3.5.1                         
Database Name    SQLite                        
Database Id      sqlite                        
Total Datasets   225                           
Total Sessions   2462   

The error message that you see suggests to me that you might be dealing with an older version and I'd just like to rule that out.

Ah wait. I think there's a bug on our end here. Am investigating now!

As a temporary fix, could you turn off validate in your config? It's set to true by default, but you should be able to set it to false to get it to work.

It seems to be a validation error that's happening here. Not anything directly related to the task routers. It'll get patched up pronto for sure, but I'd expect the routers itself to still work in your version.

HI,
So I tried with what you adviced, I changed the value of validate from true to false and then tried to run it, I get the below errors, can you look at it.

and on the terminal:

I logged so below is the log content

INFO: dataset:line_to_text, jsonl_file_path:./data/line_to_text.jsonl
INFO: session_id: line_to_text-tenzin, item: {'id': '1-1-1a_line_9874_0', 'image': 'https://s3.amazonaws.com/image-processing.openpecha/line_images/1-1-1a_line_9874_0.jpg?AWSAccessKeyId=AKIAWEXEWJ7GDFYE3KNU&Signature=HIIDrH9Oe9%2FNkNIU7Rpo4PzRorE%3D&Expires=1717832934', '_input_hash': -1794021974, '_task_hash': 1255481447}
INFO: all_session_ids: {'line_to_text-tenzin'}
INFO: controller: <prodigy.core.Controller object at 0x157bd6550>
INFO: curr_session_ids: ['line_to_text-tenzin']
INFO: session_id: line_to_text-kaldan, item: {'id': '1-1-2b_line_9874_1', 'image': 'https://s3.amazonaws.com/image-processing.openpecha/line_images/1-1-2b_line_9874_1.jpg?AWSAccessKeyId=AKIAWEXEWJ7GDFYE3KNU&Signature=s1c0msirijcxE6HnRjbNMYwOZuw%3D&Expires=1717832957', '_input_hash': -791108100, '_task_hash': -1181819318}
INFO: all_session_ids: {'line_to_text-kaldan', 'line_to_text-tenzin'}
INFO: controller: <prodigy.core.Controller object at 0x157bd6550>
INFO: curr_session_ids: ['line_to_text-kaldan', 'line_to_text-tenzin']

and below is the prodigy stats to show the version of the prodigy I have.

I can understand that it is annoying to be presented with another ERROR message, but this is progress!

I've given your task router another look and I now think that the issue is that you're not wrapping your session_id in a list. This wasn't your fault though, our docs had a few wrong examples where we'd return a string instead of a list of strings. Apologies for that! I just updated the docs.

That suggests to me that this adapted task router should work.

def custom_task_router(ctrl, session_id, item):
    logging.INFO("custom_task_router started")
    curr_session_ids = ctrl.all_session_ids
    if item['id'].split("-")[0] in session_dict.keys():
        assigned_session_id = session_dict[item['id'].split("-")[0]]
        if assigned_session_id in curr_session_ids:
            return [session_id]
    return [session_id]

One thing about this task router though, it seems like you're always return the current session_id. Is that your intention?

So, it actually ran without any issues but it is not assigning the specific task item to specific session_id, our aim of using the task router is that we have line image and text of the line image. And we need the annotators to proofread those line text but we need specific lines to be assigned to a specific annotator or session_id.
so we used the batch_size to 5 in the configuration.json file, the first instance loader session_id=tenzin gets the first 5 line images to proofread regardless if those should be assigned to that annotator or not. And when we used the batch_size to 1 and the first instance loader session_id=tenzin gets the first image regardless and then session_id=kaldan to load the instance get the second item even if that second item is for the session_id=tenzin. Is there a way that you can queue an item to an session_id even if he or she is not active, and then move on to other items so you get your own item that is assigned to you ?
We also were thinking of routing the wrong annotation to that specific annotator that annotated in the first place.

When I look at the task router, it seems to me that it's not doing anything specific, per my last comment:

def custom_task_router(ctrl, session_id, item):
    logging.INFO("custom_task_router started")
    curr_session_ids = ctrl.all_session_ids
    if item['id'].split("-")[0] in session_dict.keys():
        assigned_session_id = session_dict[item['id'].split("-")[0]]
        if assigned_session_id in curr_session_ids:
            return [session_id]
    return [session_id]

This router always returns [session_id]. That means that it will always be assigned to who-ever is currently requesting new examples.

This is totally possible, but you need to ensure that the session exists upfront. It's explained in more detail in this section on session creation:

If you're looking for an example of a task router that is able to also create sessions while routing tasks, you can get inspiration from this example.

Does this help?

Note that this issue was patched in the new alpha release of this week.

so the solution you gave solved only half the problem, after the first annotator has annotated and the reviewer review the annotations, suppose he rejects some annotations. we want to stream that rejected annotation back to that original annotator. right now we are streaming using the jsonl, so once the jsonl file is read and for loop starts it will only go on all the jsonl list once. we cant add the rejected annotations from the reviewer to that stream, so is there a way where we can go through the database first to look for all the rejected annotations and stream those first and then stream the new annotations ? if there is a way to do that then we wanna add a new column with draft, if the annotator wants to leave the draft, he can click on the drafts and it will go through the database if the draft value is true then it will stream those drafts first and then conitnue with the others ?
so right now the queue of the task items are queued as it is streamed from the custom_streamer but if in the middle of the stream I get a item that need to be annotated right now then how do I jump the queue of the custom_task_router to be streamed the immediate next stream ?

Annotations in Prodigy are stored in an "append-only" fashion. That means that you can always add more annotations, but you cannot remove/change them. There are some hacky things you might be able to do around this, like interfacing with the database directly instead of our API, but that's not something I'd recommend doing.

In theory nothing is stopping you from using a non-file stream to generate the examples. You could write a custom Python generator that polls examples from the review dataset. However, when doing this you may encounter another design choice of Prodigy and that is that we add hashes to all annotation examples. We do this so that we're able to remove duplicates from the stream. Usually, you don't want to annotate the same example twice, so we filter these items out based on the hash of the example.

An alternative route

I'm about to suggest another setup that might be easier for you, but feel free to comment if I'm misinterpreting.

I'm assuming four annotators. Three who annotate the original JSONL file and another annotator who is assigned to be a "reviewer". Then in your setup, I think, the following diagram might apply.

From your JSONL file you'll eventually end up with two Prodigy datasets. I'll call one dataset "annotations" and the other one "reviewed-annotations".

Then one observation at this point is that theoretically you may already be set to do ML. You could just take the examples from the "reviewed-annotations" dataset that were accepted to train a ML system. All of these examples should be safe to train on, because they have been accepted during the review process. If you use the review recipe you'll also be able to supply an annotation even if it does not agree with all the annotators.

However, you mention that it's your goal to take the rejected examples from the reviewed dataset and to show these to your annotators once more. You could do that by taking the rejected examples from the reviewed dataset and saving these into a new JSONL file. You can do this at regular intervals as new data comes in.

Then you can set up a recipe (with a task router) such that the reviewers can re-do their annotations.

Would this work? If you're only trying to create a dataset for ML training then you might not need to do this extra step, but I'll stop here to allow for you to correct me if I've made an incorrect assumption.

You could implement something like this using a PriorityQueue but it would involve writing some custom Python code. The ordering of the examples is not something that the task router handles, it only looks at the examples in order and decides to whom (if any) to pass each example.