Hi @MarieCo!
Thanks for an illustrative example - I think I now understand what's needed.
You can define a custom router function to send the review tasks to the right tie breakers.
For each review task, this function will check the original annotators and apply tie-braker selection logic you described.
It could look like this:
def tie_breaker_router(ctrl: Controller, session_id: str, item: TaskType) -> List[str]:
"""
Custom task router that assigns a review task to an annotator who was not
one of the original annotators.
"""
hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
item_hash = (
get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
)
original_annotators: Set[str] = set()
if "versions" in item:
for version in item.get("versions", []):
sess_id = version.get("_session_id")
original_annotators.add(sess_id)
if not original_annotators:
return [session_id]
# Session IDs in the controller are namespaced (e.g., "dataset-name-user-name").
# We need to strip the prefix to compare them to the raw session IDs from the data.
dataset_prefix = f"{ctrl.dataset}-"
all_raw_annotators: Set[str] = set()
for sid in ctrl.session_ids:
if sid.startswith(dataset_prefix):
all_raw_annotators.add(sid[len(dataset_prefix) :])
else:
all_raw_annotators.add(sid)
eligible_reviewers = sorted(list(all_raw_annotators - original_annotators))
if not eligible_reviewers:
print(item["text"])
log_router(hash_attr, item_hash, [])
return []
# We need a consistent way to assign a task to the reviewer.
# Using the modulo of the task hash ensures that the same task will always be asigned to the same
# reviewer from the list of eligible reviewers.
reviewer_idx = item_hash % len(eligible_reviewers)
chosen_reviewer_raw = eligible_reviewers[reviewer_idx]
# Reconstruct the full, namespaced session ID to return
chosen_reviewer = f"{dataset_prefix}{chosen_reviewer_raw}"
print(item["text"]) # extra logging for illustration
log_router(hash_attr, item_hash, [chosen_reviewer])
return [chosen_reviewer]
This function ensures that 1) each disagreement gets exactly one tiebreaker 2) Tiebreakers are evenly distributed among available annotators 3) The same task always routes to the same tiebreaker (deterministic) 4) No one reviews their own disagreements. Points 2) and 3) are met thanks to modulo indexing mentioned in the comments.
Now to use plug in to the recipe, the easiest way is to wrap the built-in review
recipe in the custom recipe that propagates the CLI arguments to the review
functions and adds the task_router
component mapped to the function defined above:
import prodigy
from prodigy.components.routers import log_router
from prodigy.core import Arg, Controller
from prodigy.recipes.review import review
from prodigy.types import TaskType
from prodigy.util import INPUT_HASH_ATTR, TASK_HASH_ATTR
from prodigy.structured_types import get_input_hash, get_task_hash
from typing import List, Set, Dict
def tie_breaker_router(ctrl: Controller, session_id: str, item: TaskType) -> List[str]:
"""
Custom task router that assigns a review task to an annotator who was not
one of the original annotators.
"""
hash_attr = TASK_HASH_ATTR if ctrl.exclude_by == "task" else INPUT_HASH_ATTR
item_hash = (
get_task_hash(item) if ctrl.exclude_by == "task" else get_input_hash(item)
)
original_annotators: Set[str] = set()
if "versions" in item:
for version in item.get("versions", []):
sess_id = version.get("_session_id")
original_annotators.add(sess_id)
if not original_annotators:
return [session_id]
# Session IDs in the controller are namespaced (e.g., "dataset-name-user-name").
# We need to strip the prefix to compare them to the raw session IDs from the data.
dataset_prefix = f"{ctrl.dataset}-"
all_raw_annotators: Set[str] = set()
for sid in ctrl.session_ids:
if sid.startswith(dataset_prefix):
all_raw_annotators.add(sid[len(dataset_prefix) :])
else:
all_raw_annotators.add(sid)
eligible_reviewers = sorted(list(all_raw_annotators - original_annotators))
if not eligible_reviewers:
print(item["text"])
log_router(hash_attr, item_hash, [])
return []
reviewer_idx = item_hash % len(eligible_reviewers)
chosen_reviewer_raw = eligible_reviewers[reviewer_idx]
# Reconstruct the full, namespaced session ID to return
chosen_reviewer = f"{dataset_prefix}{chosen_reviewer_raw}"
print(item["text"])
log_router(hash_attr, item_hash, [chosen_reviewer])
return [chosen_reviewer]
@prodigy.recipe(
"review.tie-breaker",
dataset=Arg(help="Dataset to save annotations to"),
input_sets=Arg(help="Comma-separated names of datasets to review"),
view_id=Arg(
"--view-id", "-v", help="View ID to use if none is present in the task"
),
label=Arg("--label", "-l", help="Comma-separated labels to annotate"),
fetch_media=Arg("--fetch-media", "-FM", help="Load media from local paths or URLs"),
show_skipped=Arg("--show-skipped", "-S", help="Include skipped answers"),
auto_accept=Arg(
"--auto-accept", "-A", help="Automatically accept annotations with no conflicts"
),
accept_single=Arg(
"--accept-single",
"-AS",
help="Automatically accept examples with only single user annotations",
),
)
def review_tie_breaker(
dataset: str,
input_sets: List[str],
view_id: str = None,
label: List[str] = [],
fetch_media: bool = False,
show_skipped: bool = False,
auto_accept: bool = False,
accept_single: bool = False,
):
"""
A thin wrapper around the `review` recipe that injects a "tie-breaker"
task router. Use the --auto-accept flag to only review disagreements.
"""
# Call the original `review` recipe to get all its components,
# passing all the arguments through.
components = review(
dataset,
input_sets,
view_id,
label,
fetch_media,
show_skipped,
auto_accept,
accept_single,
)
# Inject our custom router.
components["task_router"] = tie_breaker_router
# Ensure the router's decisions are final.
config = components.get("config", {})
config["allow_work_stealing"] = False
components["config"] = config
return components
You should be able to run it with the same arguments as the built-in review
recipe. For example:
PRODIGY_LOGGING=verbose PRODIGY_ALLOWED_SESSIONS="A,B,C,D" python -m prodigy review.tie-breaker resolved_dataset review_scenarios --auto-accept -F review_tie_breaker.py
One important thing to have in mind is that Prodigy tasks are routed only once. In order to ensure each conflicting task is reviewed would be to specify the available annotators with PRODIGY_ALLOWED_SEESIONS
to make sure all sessions queues are available to the router from get go.
To test it, you can use this example input dataset:
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "A", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "B", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "C", "_view_id": "choice"}
{"text": "The company announced record profits this quarter.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "1b46e1c6", "_task_hash": "1b51dbe3", "_session_id": "D", "_view_id": "choice"}
{"text": "The stock price dropped significantly after the news.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "14f63ae3", "_task_hash": "10cbf4bc", "_session_id": "A", "_view_id": "choice"}
{"text": "The stock price dropped significantly after the news.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "14f63ae3", "_task_hash": "06778280", "_session_id": "B", "_view_id": "choice"}
{"text": "Employees are satisfied with the new benefits package.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "03a5d140", "_task_hash": "199033fc", "_session_id": "C", "_view_id": "choice"}
{"text": "Employees are satisfied with the new benefits package.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "03a5d140", "_task_hash": "199033fc", "_session_id": "D", "_view_id": "choice"}
{"text": "The merger talks have stalled indefinitely.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["negative"], "_input_hash": "05db620f", "_task_hash": "0116d98f", "_session_id": "A", "_view_id": "choice"}
{"text": "The merger talks have stalled indefinitely.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "05db620f", "_task_hash": "0116d98f", "_session_id": "C", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "A", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "B", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "C", "_view_id": "choice"}
{"text": "Customer reviews are overwhelmingly positive.", "options": [{"id": "positive", "text": "Positive"}, {"id": "negative", "text": "Negative"}], "answer": "accept", "accept": ["positive"], "_input_hash": "02f8e85e", "_task_hash": "0430a25e", "_session_id": "D", "_view_id": "choice"}
it covers the following scenarios:
ANNOTATIONS = {
"task_1": { # "The company announced record profits this quarter."
"A": "positive",
"B": "positive", # Agreement - should not appear in review if --auto-accept is set
"C": "positive",
"D": "positive"
},
"task_2": { # "The stock price dropped significantly after the news."
"A": "negative",
"B": "positive", # A,B disagree - should go to C or D
},
"task_3": { # "Employees are satisfied with the new benefits package."
"C": "negative", # C,D disagree - should go to A or B
"D": "negative"
},
"task_4": { # "The merger talks have stalled indefinitely."
"A": "negative", # A,C disagree - should go to B or D
"C": "negative",
},
"task_5": { # "Customer reviews are overwhelmingly positive."
"A": "positive",
"B": "positive", # Agreement - should not appear in review if --auto-accept is set
"C": "positive",
"D": "positive"
}
}
If you run the custom review recipe with this dataset as input and PRODIGY_LOGGING
set to verbose
and PRODIGY_ALLOWED_SESSIONS
specified, you should see the your custom router doing the right thing:
13:54:09: CONTROLLER: Getting batch of questions for session: resolved_dataset-A
The stock price dropped significantly after the news.
13:54:09: ROUTER: Routing item with _task_hash=-1738123000 -> ['resolved_dataset-C']
Employees are satisfied with the new benefits package.
13:54:09: ROUTER: Routing item with _task_hash=-753195760 -> ['resolved_dataset-A']
The merger talks have stalled indefinitely.
13:54:09: ROUTER: Routing item with _task_hash=-338352875 -> ['resolved_dataset-D']
Also, the current solution assumes that there will always be an annotator that haven't seen the task. If your dataset contains examples that have been annotated by all 4 and they disagree, the router won't route it to anyone under the rule that nobody should review their disagreements. You can of course modify the logic to handled that as you wish, though.
Let me know if there's anything that's unclear!