Review recipe with different annotator

Hi,

I am currently trying to run the review recipe on a dataset with multiple annotators. I’d like for someone who has not annotated yet to be the tie breaker, is that possible to route automatically? And if so, how? Have read through the review documentation but didn’t see any hint.

Thanks in advance!

Marie

Hi @MarieCo,

Are you running the annotation and the review in sequence i.e. once the entire dataset has been annotated by multiple annotators, the annotation server is shut and the a new one is started with the review recipe?
If that was the case, you could use PRODIGY_ALLOWED_SESSIONS environment variable to make sure only the person that was not included in the previous annotation can access the server.
I have a feeling, though that the setup is more complicated than this, can you explain a bit more about how you'd like the review recipe to work?
Thanks!

Hi @magdaaniol ,

Thanks for your reply!

I am running the annotation and review in sequence indeed, but the setup is slightly more complicated. I had 4 different people annotating, with each task annotated twice.

I would now like to review projects where there was a disagreement between annotators. It can be any combination of two from the initial 4, and I’d like a different 3rd person to break the tie (I have the same 4 people available for review). For example, I if I have 3 tasks and annotators A, B, C, D as follows:

  • Task 1: A and B disagree, C (or D) review
  • Task 2: C and D disagree, A(or B) review
  • Task 3: A and C disagree, B (or D) review

Does that make more sense? I am not sure PRODIGY_ALLOWED_SESSIONS would work since any of the initial 4 could review. I am also aware that feed_overlap and annotations_per_task can be set up in prodigy.json, but it doesn’t seem like it would work in that case.

Hi @MarieCo!

Thanks for an illustrative example - I think I now understand what's needed.
You can define a custom router function to send the review tasks to the right tie breakers.
For each review task, this function will check the original annotators and apply tie-braker selection logic you described.
It could look like this:

def tie_breaker_router(ctrl: Controller, session_id: str, item: TaskType) -> List[str]:
    """
    Custom task router that assigns a review task to an annotator who was not
    one of the original annotators.
    """
    hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
    item_hash = (
        get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
    )
    original_annotators: Set[str] = set()
    if "versions" in item:
        for version in item.get("versions", []):
            sess_id = version.get("_session_id")
            original_annotators.add(sess_id)

    if not original_annotators:
        return [session_id]

    # Session IDs in the controller are namespaced (e.g., "dataset-name-user-name").
    # We need to strip the prefix to compare them to the raw session IDs from the data.
    dataset_prefix = f"{ctrl.dataset}-"
    all_raw_annotators: Set[str] = set()
    for sid in ctrl.session_ids:
        if sid.startswith(dataset_prefix):
            all_raw_annotators.add(sid[len(dataset_prefix) :])
        else:
            all_raw_annotators.add(sid)

    eligible_reviewers = sorted(list(all_raw_annotators - original_annotators))

    if not eligible_reviewers:
        print(item["text"])
        log_router(hash_attr, item_hash, [])
        return []
    # We need a consistent way to assign a task to the reviewer.
    # Using the modulo of the task hash ensures that the same task will always be asigned to the same 
    # reviewer from the list of eligible reviewers.
    reviewer_idx = item_hash % len(eligible_reviewers)
    chosen_reviewer_raw = eligible_reviewers[reviewer_idx]

    # Reconstruct the full, namespaced session ID to return
    chosen_reviewer = f"{dataset_prefix}{chosen_reviewer_raw}"
    print(item["text"]) # extra logging for illustration
    log_router(hash_attr, item_hash, [chosen_reviewer])
    return [chosen_reviewer]

This function ensures that 1) each disagreement gets exactly one tiebreaker 2) Tiebreakers are evenly distributed among available annotators 3) The same task always routes to the same tiebreaker (deterministic) 4) No one reviews their own disagreements. Points 2) and 3) are met thanks to modulo indexing mentioned in the comments.

Now to use plug in to the recipe, the easiest way is to wrap the built-in review recipe in the custom recipe that propagates the CLI arguments to the review functions and adds the task_router component mapped to the function defined above:

import prodigy
from prodigy.components.routers import log_router
from prodigy.core import Arg, Controller
from prodigy.recipes.review import review
from prodigy.types import TaskType
from prodigy.util import INPUT_HASH_ATTR, TASK_HASH_ATTR
from prodigy.structured_types import get_input_hash, get_task_hash
from typing import List, Set, Dict

def tie_breaker_router(ctrl: Controller, session_id: str, item: TaskType) -> List[str]:
    """
    Custom task router that assigns a review task to an annotator who was not
    one of the original annotators.
    """
    hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
    item_hash = (
        get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
    )
    original_annotators: Set[str] = set()
    if "versions" in item:
        for version in item.get("versions", []):
            sess_id = version.get("_session_id")
            original_annotators.add(sess_id)

    if not original_annotators:
        return [session_id]

    # Session IDs in the controller are namespaced (e.g., "dataset-name-user-name").
    # We need to strip the prefix to compare them to the raw session IDs from the data.
    dataset_prefix = f"{ctrl.dataset}-"
    all_raw_annotators: Set[str] = set()
    for sid in ctrl.session_ids:
        if sid.startswith(dataset_prefix):
            all_raw_annotators.add(sid[len(dataset_prefix) :])
        else:
            all_raw_annotators.add(sid)

    eligible_reviewers = sorted(list(all_raw_annotators - original_annotators))

    if not eligible_reviewers:
        print(item["text"])
        log_router(hash_attr, item_hash, [])
        return []

    reviewer_idx = item_hash % len(eligible_reviewers)
    chosen_reviewer_raw = eligible_reviewers[reviewer_idx]

    # Reconstruct the full, namespaced session ID to return
    chosen_reviewer = f"{dataset_prefix}{chosen_reviewer_raw}"
    print(item["text"])
    log_router(hash_attr, item_hash, [chosen_reviewer])
    return [chosen_reviewer]


@prodigy.recipe(
    "review.tie-breaker",
    dataset=Arg(help="Dataset to save annotations to"),
    input_sets=Arg(help="Comma-separated names of datasets to review"),
    view_id=Arg(
        "--view-id", "-v", help="View ID to use if none is present in the task"
    ),
    label=Arg("--label", "-l", help="Comma-separated labels to annotate"),
    fetch_media=Arg("--fetch-media", "-FM", help="Load media from local paths or URLs"),
    show_skipped=Arg("--show-skipped", "-S", help="Include skipped answers"),
    auto_accept=Arg(
        "--auto-accept", "-A", help="Automatically accept annotations with no conflicts"
    ),
    accept_single=Arg(
        "--accept-single",
        "-AS",
        help="Automatically accept examples with only single user annotations",
    ),
)
def review_tie_breaker(
    dataset: str,
    input_sets: List[str],
    view_id: str = None,
    label: List[str] = [],
    fetch_media: bool = False,
    show_skipped: bool = False,
    auto_accept: bool = False,
    accept_single: bool = False,
):
    """
    A thin wrapper around the `review` recipe that injects a "tie-breaker"
    task router. Use the --auto-accept flag to only review disagreements.
    """
    # Call the original `review` recipe to get all its components,
    # passing all the arguments through.
    components = review(
        dataset,
        input_sets,
        view_id,
        label,
        fetch_media,
        show_skipped,
        auto_accept,
        accept_single,
    )
    # Inject our custom router.
    components["task_router"] = tie_breaker_router

    # Ensure the router's decisions are final.
    config = components.get("config", {})
    config["allow_work_stealing"] = False
    components["config"] = config

    return components

You should be able to run it with the same arguments as the built-in review recipe. For example:

PRODIGY_LOGGING=verbose PRODIGY_ALLOWED_SESSIONS="A,B,C,D" python -m prodigy review.tie-breaker resolved_dataset review_scenarios --auto-accept -F review_tie_breaker.py

One important thing to have in mind is that Prodigy tasks are routed only once. In order to ensure each conflicting task is reviewed would be to specify the available annotators with PRODIGY_ALLOWED_SEESIONS to make sure all sessions queues are available to the router from get go.

To test it, you can use this example input dataset:

{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "A", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "B", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "C", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "D", "_view_id": "choice"}
{"text": "The stock price dropped significantly after the news.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "14f63ae3", "_task_hash": "10cbf4bc", "_session_id": "A", "_view_id": "choice"}
{"text": "The stock price dropped significantly after the news.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "14f63ae3", "_task_hash": "06778280", "_session_id": "B", "_view_id": "choice"}
{"text": "Employees are satisfied with the new benefits package.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "03a5d140", "_task_hash": "199033fc", "_session_id": "C", "_view_id": "choice"}
{"text": "Employees are satisfied with the new benefits package.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "03a5d140", "_task_hash": "199033fc", "_session_id": "D", "_view_id": "choice"}
{"text": "The merger talks have stalled indefinitely.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "05db620f", "_task_hash": "0116d98f", "_session_id": "A", "_view_id": "choice"}
{"text": "The merger talks have stalled indefinitely.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "05db620f", "_task_hash": "0116d98f", "_session_id": "C", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "A", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "B", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "C", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "D", "_view_id": "choice"}

it covers the following scenarios:

ANNOTATIONS = {
    "task_1": {  # "The company announced record profits this quarter."
        "A": "positive",
        "B": "positive",  # Agreement - should not appear in review if --auto-accept is set
        "C": "positive",
        "D": "positive"
    },
    "task_2": {  # "The stock price dropped significantly after the news."
        "A": "negative", 
        "B": "positive",  # A,B disagree - should go to C or D
    },
    "task_3": {  # "Employees are satisfied with the new benefits package."
        "C": "negative",  # C,D disagree - should go to A or B  
        "D": "negative"
    },
    "task_4": {  # "The merger talks have stalled indefinitely."
        "A": "negative", # A,C disagree - should go to B or D
        "C": "negative", 
    },
    "task_5": {  # "Customer reviews are overwhelmingly positive."
        "A": "positive",
        "B": "positive",  # Agreement - should not appear in review if --auto-accept is set
        "C": "positive", 
        "D": "positive"
    }
}

If you run the custom review recipe with this dataset as input and PRODIGY_LOGGING set to verbose and PRODIGY_ALLOWED_SESSIONS specified, you should see the your custom router doing the right thing:

13:54:09: CONTROLLER: Getting batch of questions for session: resolved_dataset-A
The stock price dropped significantly after the news.
13:54:09: ROUTER: Routing item with _task_hash=-1738123000 -> ['resolved_dataset-C']
Employees are satisfied with the new benefits package.
13:54:09: ROUTER: Routing item with _task_hash=-753195760 -> ['resolved_dataset-A']
The merger talks have stalled indefinitely.
13:54:09: ROUTER: Routing item with _task_hash=-338352875 -> ['resolved_dataset-D']

Also, the current solution assumes that there will always be an annotator that haven't seen the task. If your dataset contains examples that have been annotated by all 4 and they disagree, the router won't route it to anyone under the rule that nobody should review their disagreements. You can of course modify the logic to handled that as you wish, though.
Let me know if there's anything that's unclear!

Hi @magdaaniol

Thanks a lot for the detailed response!

I tried the code on the example dataset you gave me, and it seems to be working fine, but it doesn’t seem to be working with my dataset: when logging in under e.g. user A I still got an example that that same user had reviewed.

Now I wonder if that is because I had a custom text categorization function (mostly custom because I had to put a custom loader in there)? The view-id is still choice though, and I specify it in the command line, so not sure why that would not work.

I put the custom function below, do you see any obvious point of friction?

@prodigy.recipe(
	"custom-textcat",
	dataset=("The dataset to use", "positional", None, str))

def custom_textcat(dataset, source):
    blocks = [
        {"view_id": "choice"}
    ]
    options = [
        {"id": 1, "text": "😺 Relevant"},
        {"id": 0, "text": "😾 Not relevant"}
    ]
    def custom_csv_loader(source, encoding='utf-8-sig'): 
        with open(source, mode="r", encoding="utf-8-sig") as csvfile: 
            reader = csv.DictReader(csvfile)
            rows = list(reader)  
            random.shuffle(rows)  
            for row in rows: 
                training = "TRAINING SET\n\\n" if row.get('Include')=="relevant" else ""
                text = training + row.get('title') + '\n\n' + row.get('objective')
                id = row.get('id')
                acronym = row.get('acronym')
                yield {"text": text, "options": options, "meta": {"id":id, "acronym": acronym}}

    stream_as_generator = custom_csv_loader(source)
    stream = Stream.from_iterable(stream_as_generator) 

    return {
        "dataset": dataset,         
        "view_id": "blocks",         
        "stream": stream,            
        "config": {
            "labels": [""],
            "blocks": blocks
        }
    }

Hi @MarieCo,

In principle, if you specify the view_id for the review it should handle it correctly. Regardless, if you use only one view_id you don't really need blocks. You could have specified choice directly.
Could you provide an example line from the dataset you created with the
"custom-textcat" recipe? Just to make sure we work from the same data structure.
You can use db-out to extract it to a jsonl file.
Thank you!

Hi @magdaaniol
Sure:

{"text":"xxx”,options":[{"id":1,"text":"Relevant"},{"id":0,"text":"Not relevant"}],"meta":{"id":"334991","acronym":"ACRO"},"_input_hash":1455707370,"_task_hash":-1178025107,"_view_id":"blocks","accept":[0],"config":{"choice_style":"single"},"answer":"accept","_timestamp":1756816251,"_annotator_id":" dataset-A","_session_id":"dataset-A"}

Hope this helps, I am a bit puzzled why this is not working. Thanks!

Hi @MarieCo,

There's one issue with the custom router I shared with you: it's not parsing the _session_id from the incoming annotations correctly. Currently, if you printed the original_annotators, raw_annotators and eligible_reviewers with the example you provided, you'd get this:

ORIGINAL ANNOTATORS: {'dataset-A'}
ALL RAW ANNOTATORS: {'A', 'D', 'B', 'C'}
ELIGIBLE REVIEWERS: ['A', 'B', 'C', 'D']

As you can see the eligible_reviewers list still contains A because set operation was trying to remove dataset-A rather than A from all_raw_annotators.

So we need to fix the parsing:

if "versions" in item:
        for version in item.get("versions", []):
            sess_id = version.get("_session_id")
            if sess_id:
                original_annotators.add(sess_id.split("-")[-1]) # assuming the names are just A,B,C,D i.e. not containing a `-`

With this change the set operation should work as expected:

ORIGINAL ANNOTATORS: {'A'}
ALL RAW ANNOTATORS: {'D', 'A', 'B', 'C'}
ELIGIBLE REVIEWERS: ['B', 'C', 'D']