so we are trying to use the latest prodigy==1.12a3 and we are using it for task routing. we have many annotators so we want to use task router to route specific image to a specific annotator using the stream id and session_id. how can we add task router to the current custom recipe. And we dont know from where to get the sesion_id, controller and item
import logging
from tools.config import LAYOUT_ANALYSIS_BUCKET, layout_analysis_s3_client
import random
from prodigy.core import Controller
import prodigy
s3_client = layout_analysis_s3_client
bucket_name = LAYOUT_ANALYSIS_BUCKET
# log config
logging.basicConfig(
filename="/usr/local/prodigy/logs/line_to_text.log",
format="%(levelname)s: %(message)s",
level=logging.INFO,
)
# Prodigy has a logger named "prodigy" according to
# https://support.prodi.gy/t/how-to-write-log-data-to-file/1427/10
prodigy_logger = logging.getLogger('prodigy')
prodigy_logger.setLevel(logging.INFO)
def custom_task_router(ctrl, session_id, item):
pass
@prodigy.recipe("line-to-text-recipe")
def line_to_text_recipe(dataset, jsonl_file):
logging.info(f"dataset:{dataset}, jsonl_file_path:{jsonl_file}")
blocks = [
{"view_id": "image"},
{"view_id": "text_input"}
]
return {
"dataset": dataset,
"stream": stream_from_jsonl(jsonl_file),
"view_id": "blocks",
"task_router": custom_task_router(),
"config": {
"blocks": blocks,
"editable": True
}
}
def stream_from_jsonl(jsonl_file):
with jsonlines.open(jsonl_file) as reader:
for line in reader:
image_id = line["id"]
obj_key = line["image_url"]
text = line["user_input"]
image_url = get_new_url(obj_key)
yield {"id": image_id, "image": image_url, "user_input": text}
def get_new_url(image_url):
new_image_url = s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": bucket_name, "Key": image_url},
ExpiresIn=31536000
)
return new_image_url
import prodigy
def custom_task_router(ctrl, session_id, item):
...
@prodigy.recipe(
"my-custom-recipe",
dataset=("Dataset to save answers to", "positional", None, str),
view_id=("Annotation interface", "option", "v", str)
)
def my_custom_recipe(dataset, view_id="text"):
# Load your own streams from anywhere you want
stream = load_my_custom_stream()
return {
"dataset": dataset,
"view_id": view_id,
"stream": stream,
"task_router": custom_task_router
}
Notice how custom_task_router is a Python function that we're not calling? We're just passing the function, such that Prodigy can use it later. The task router is a "callback function" that will get the controller, session_id and item at runtime. It is called each time an item needs to be routed. Prodigy needs the function at the start, but won't call it until items actually need to be assigned.
Does this help?
Extra information
If it helps, you can also find more information on what you can do with the Controller on the docs found here:
Are you 100% sure that you're running the latest Prodigy version? Could you run prodigy stats?
This is what I see when I do that.
============================== ✨ Prodigy Stats ==============================
Version 1.12a3
Location /home/vincent/Development/prodigy-demos/venv/lib/python3.8/site-packages/prodigy
Prodigy Home /home/vincent/.prodigy
Platform Linux-5.11.0-7614-generic-x86_64-with-glibc2.32
Python Version 3.8.6
Spacy Version 3.5.1
Database Name SQLite
Database Id sqlite
Total Datasets 225
Total Sessions 2462
The error message that you see suggests to me that you might be dealing with an older version and I'd just like to rule that out.
As a temporary fix, could you turn off validate in your config? It's set to true by default, but you should be able to set it to false to get it to work.
It seems to be a validation error that's happening here. Not anything directly related to the task routers. It'll get patched up pronto for sure, but I'd expect the routers itself to still work in your version.
HI,
So I tried with what you adviced, I changed the value of validate from true to false and then tried to run it, I get the below errors, can you look at it.
I can understand that it is annoying to be presented with another ERROR message, but this is progress!
I've given your task router another look and I now think that the issue is that you're not wrapping your session_id in a list. This wasn't your fault though, our docs had a few wrong examples where we'd return a string instead of a list of strings. Apologies for that! I just updated the docs.
That suggests to me that this adapted task router should work.
def custom_task_router(ctrl, session_id, item):
logging.INFO("custom_task_router started")
curr_session_ids = ctrl.all_session_ids
if item['id'].split("-")[0] in session_dict.keys():
assigned_session_id = session_dict[item['id'].split("-")[0]]
if assigned_session_id in curr_session_ids:
return [session_id]
return [session_id]
One thing about this task router though, it seems like you're always return the current session_id. Is that your intention?
So, it actually ran without any issues but it is not assigning the specific task item to specific session_id, our aim of using the task router is that we have line image and text of the line image. And we need the annotators to proofread those line text but we need specific lines to be assigned to a specific annotator or session_id.
so we used the batch_size to 5 in the configuration.json file, the first instance loader session_id=tenzin gets the first 5 line images to proofread regardless if those should be assigned to that annotator or not. And when we used the batch_size to 1 and the first instance loader session_id=tenzin gets the first image regardless and then session_id=kaldan to load the instance get the second item even if that second item is for the session_id=tenzin. Is there a way that you can queue an item to an session_id even if he or she is not active, and then move on to other items so you get your own item that is assigned to you ?
We also were thinking of routing the wrong annotation to that specific annotator that annotated in the first place.
so the solution you gave solved only half the problem, after the first annotator has annotated and the reviewer review the annotations, suppose he rejects some annotations. we want to stream that rejected annotation back to that original annotator. right now we are streaming using the jsonl, so once the jsonl file is read and for loop starts it will only go on all the jsonl list once. we cant add the rejected annotations from the reviewer to that stream, so is there a way where we can go through the database first to look for all the rejected annotations and stream those first and then stream the new annotations ? if there is a way to do that then we wanna add a new column with draft, if the annotator wants to leave the draft, he can click on the drafts and it will go through the database if the draft value is true then it will stream those drafts first and then conitnue with the others ?
so right now the queue of the task items are queued as it is streamed from the custom_streamer but if in the middle of the stream I get a item that need to be annotated right now then how do I jump the queue of the custom_task_router to be streamed the immediate next stream ?
Annotations in Prodigy are stored in an "append-only" fashion. That means that you can always add more annotations, but you cannot remove/change them. There are some hacky things you might be able to do around this, like interfacing with the database directly instead of our API, but that's not something I'd recommend doing.
In theory nothing is stopping you from using a non-file stream to generate the examples. You could write a custom Python generator that polls examples from the review dataset. However, when doing this you may encounter another design choice of Prodigy and that is that we add hashes to all annotation examples. We do this so that we're able to remove duplicates from the stream. Usually, you don't want to annotate the same example twice, so we filter these items out based on the hash of the example.
An alternative route
I'm about to suggest another setup that might be easier for you, but feel free to comment if I'm misinterpreting.
I'm assuming four annotators. Three who annotate the original JSONL file and another annotator who is assigned to be a "reviewer". Then in your setup, I think, the following diagram might apply.
From your JSONL file you'll eventually end up with two Prodigy datasets. I'll call one dataset "annotations" and the other one "reviewed-annotations".
Then one observation at this point is that theoretically you may already be set to do ML. You could just take the examples from the "reviewed-annotations" dataset that were accepted to train a ML system. All of these examples should be safe to train on, because they have been accepted during the review process. If you use the review recipe you'll also be able to supply an annotation even if it does not agree with all the annotators.
However, you mention that it's your goal to take the rejected examples from the reviewed dataset and to show these to your annotators once more. You could do that by taking the rejected examples from the reviewed dataset and saving these into a new JSONL file. You can do this at regular intervals as new data comes in.
Then you can set up a recipe (with a task router) such that the reviewers can re-do their annotations.
Would this work? If you're only trying to create a dataset for ML training then you might not need to do this extra step, but I'll stop here to allow for you to correct me if I've made an incorrect assumption.
You could implement something like this using a PriorityQueue but it would involve writing some custom Python code. The ordering of the examples is not something that the task router handles, it only looks at the examples in order and decides to whom (if any) to pass each example.