I have two annotated input tasks, and I want to review them using review.py
. When I run the following command, I don't get any errors and Prodigy starts up in my browser. However, it just continuously loads and then stalls.
Command:
PRODIGY_ALLOWED_SESSIONS=cheyanne prodigy review input-name1,input-name2 -F /Users/recipes/review.py
Custom recipe used to annotate the two input sets (intent, utterance, binary correct/incorrect choice field, checkbox for speech-to-text errors, and notes field):
from typing import Dict, Generator
import prodigy
from prodigy.components.loaders import JSONL
from annotator.recipes.utils import VALIDATION
UNLIMITED_ROWS = [
{"view_id": "html", "html_template":
"<div style=\"padding: 0 10px; border: 1px solid #ddd; border-radius: 4px; text-align: left;\">" +
"<label style=\"font-size: 14px; opacity: 0.90; margin-bottom: 10px;\">intent</label>" +
"<div style=\"max-height: 300px; overflow-y: scroll; margin-bottom: 10px;\">{{final_intent_v3}}" +
"<br>" +
"<br>" +
"<label style=\"font-size: 14px; opacity: 0.90; margin-bottom: 10px;\">utterance</label>" +
"<div style=\"max-height: 300px; overflow-y: scroll; margin-bottom: 10px;\">{{utterance}}" +
"</div>"
},
{"view_id": "choice"},
{"view_id": "html", "html_template": "<div style='float:left;'>" +
"<input name='stt_error' id='stt_error' type='checkbox' value='STT Error' style='margin-right:10px;' data-id='{{utterance}}' onchange='updateSttError()'" +
"<label onclick='update()'>STT Error</label>"
},
{"view_id": "text_input", "field_label": "notes"}
]
def add_options(
stream, label_field="", choices=VALIDATION
) -> Generator[Dict, None, None]:
"""
Convert each line in the ``stream`` to a ``task`` with a text and an
options field
:param stream: the input stream
:param label_field: key; defaults to "label"
:param choices: the different choices
:yield: a task Dict with text and options
"""
for line in stream:
options = [word for word in choices]
task = {
"final_intent_v3": line["final_intent_v3"],
"utterance": line["utterance"],
"stt_error": False,
"options": [
{"id": o, "deployment": o, "prompt": o,
"text": o} for o in options
]
}
yield task
@prodigy.recipe(
"intent-validation",
dataset=("The dataset to save to", "positional", None, str),
file_path=("Path to texts", "positional", None, str)
)
def custom_labels(dataset, file_path):
"""
Annotate the text with labels from the list from the ``label_field`` in
the input file. Augmented with choices from ``choice_field``.
"""
blocks = UNLIMITED_ROWS
stream = JSONL(file_path)
stream = add_options(stream) # add options to each task
javascript = """
// Set stt_error to false by default
prodigy.update({ stt_error: false });
function updateSttError() {
prodigy.update({ stt_error: document.getElementById('stt_error').checked });
}
document.addEventListener('prodigyanswer', (event) => {
// Reset stt_error to false
prodigy.update({ stt_error: false });
document.getElementById('stt_error').checked = false;
});
"""
return {
"dataset": dataset,
"view_id": "blocks",
"stream": list(stream),
"config": {
"blocks": blocks,
"javascript": javascript
}
}
Custom review.py
recipe (I just added a notes field):
# pyright: reportUndefinedVariable=false, reportGeneralTypeIssues=false
import copy
import json
import prodigy
import spacy
from collections import defaultdict
from typing import Any, Dict, Generator, Iterator, Iterable, List, Optional, Tuple, Union
from prodigy.components.db import Database, connect
from prodigy.components.decorators import support_both_streams
from prodigy.components.loaders import JSONL
from prodigy.components.preprocess import add_tokens, fetch_media as fetch_media_preprocessor
from prodigy.components.stream import get_stream
from prodigy.core import recipe
from prodigy.recipes.spans import manual as prodigy_spans_manual
from prodigy.types import RecipeSettingsType, StreamType, TaskType
from prodigy.util import (
IGNORE_HASH_KEYS,
INPUT_HASH_ATTR,
SESSION_ID_ATTR,
TASK_HASH_ATTR,
VIEW_ID_ATTR,
get_labels,
log,
msg,
set_hashes,
split_string,
)
UNSUPPORTED_VIEW_IDS = ("image_manual", "compare", "diff")
INPUT_KEYS = ("text", "image", "html", "options", "audio", "video")
TASK_KEYS = ("spans", "label", "accept", "audio_spans", "relations")
class ReviewStream:
def __init__(
self,
data: Dict[int, Dict[Tuple[int, str], TaskType]],
by_input: bool = False,
show_skipped: bool = False,
) -> None:
"""Initialize a review stream. This class mostly exists so we can
expose a __len__ (to show the total progress) and to separate out some
of the task-type specific abstractions like by_input.
data (dict): The merged data: {INPUT_HASH: { (TASK_HASH, answer): task }}.
by_input (bool): Whether to consider everything with the same input hash
to be the same task the review. This makes sense for datasets with
ner_manual annotations on the same text. Different task hashes on
the same input would then be considered conflicts. If False,
examples with different task hashes are considered different tasks
to review and only the answers (accept / reject) are what could be
considered a conflict. This makes sense for binary annotations
where a reviewer would only be judging the accept/reject decisions.
show_skipped (bool): Include answers that would otherwise be skipped,
including annotations with answer "ignore" and rejected examples
in manual annotation modes (e.g. manual NER annotation with answer
"reject").
"""
self.show_skipped = show_skipped
if by_input:
self.data = self.get_data_by_input(data)
else:
self.data = self.get_data_by_task(data)
def __len__(self) -> int:
return len(self.data)
def __iter__(self) -> StreamType:
for eg in self.data:
yield eg
def make_eg(self, versions: List[TaskType]) -> TaskType:
default_version = max(versions, key=lambda v: len(v["sessions"]))
default_version_idx = versions.index(default_version)
eg = copy.deepcopy(default_version)
for i, version in enumerate(versions):
version["default"] = i == default_version_idx
eg["versions"] = versions
eg["view_id"] = eg[VIEW_ID_ATTR]
return eg
def get_data_by_input(
self, data: Dict[int, Dict[Tuple[int, str], TaskType]]
) -> List[TaskType]:
# We're considering everything with the same input hash to
# be the same task to review (e.g. different spans on same
# text when highlighted manually). Different task hashes on the same
# input are treated as conflicts to resolve. Rejected answers are
# automatically excluded.
examples = []
for input_versions in data.values():
versions = []
for _, task_versions in input_versions.items():
if not self.show_skipped:
task_versions = [
v for v in task_versions if v["answer"] == "accept"
]
if task_versions:
version = copy.deepcopy(task_versions[0])
sessions = sorted(
set([eg[SESSION_ID_ATTR] for eg in task_versions])
)
version["sessions"] = sessions
versions.append(version)
if versions:
examples.append(self.make_eg(versions))
return examples
def get_data_by_task(
self, data: Dict[int, Dict[Tuple[int, str], TaskType]]
) -> List[TaskType]:
# We're only considering everything with the same task hash to be the
# same task to review and provide only two versions: accept and reject.
examples = []
by_task = defaultdict(list)
for input_versions in data.values():
for (task_hash, _), task_versions in input_versions.items():
if task_versions:
version = copy.deepcopy(task_versions[0])
sessions = sorted(
set([eg[SESSION_ID_ATTR] for eg in task_versions])
)
version["sessions"] = sessions
by_task[task_hash].append(version)
for versions in by_task.values():
examples.append(self.make_eg(versions))
return examples
def get_review_stream(
datasets: Dict[str, List[dict]],
default_view_id: Optional[str] = None,
fetch_media: bool = False,
show_skipped: bool = False,
) -> StreamType:
merged = defaultdict(dict)
global_view_id = default_view_id
n_merged = 0
for set_id, examples in datasets.items():
if not show_skipped:
examples = (eg for eg in examples if eg["answer"] != "ignore")
if fetch_media:
# Replace paths and URLs with base64 data
examples = fetch_media_preprocessor(examples, ["image", "audio", "video"])
for eg in examples:
# Rehash example to make sure we're comparing correctly. In this
# case, we want to consider "options" an input key and "accept" a
# task key, so we can treat choice examples as by_input. We also
# want to ignore the answer and key by it separately.
eg = set_hashes(
eg,
overwrite=True,
input_keys=INPUT_KEYS,
task_keys=TASK_KEYS,
ignore=IGNORE_HASH_KEYS,
)
# Make sure example has session ID (backwards compatibility)
session_id = eg.get(SESSION_ID_ATTR, set_id)
eg[SESSION_ID_ATTR] = session_id if session_id is not None else set_id
# Make sure example has view ID (backwards compatibility)
eg_view_id = eg.get(VIEW_ID_ATTR, default_view_id)
if eg_view_id is None:
print(eg) # noqa: T201
msg.fail(
f"No '{VIEW_ID_ATTR}' found in the example",
"This is likely because it was created with Prodigy <1.8). "
"Please specify a --view-id on the command line. For "
"example, 'ner_manual' (if the annotations were created with "
"the manual interface), 'classification', 'choice' etc.",
exits=1,
)
if eg_view_id in UNSUPPORTED_VIEW_IDS:
msg.fail(
f"Reviewing '{eg_view_id}' annotations isn't supported yet",
"You can vote for this feature on the forum: https://support.prodi.gy",
exits=1,
)
if global_view_id is None:
global_view_id = eg_view_id
if global_view_id != eg_view_id and not default_view_id:
msg.fail(
"Conflicting view_id values in datasets",
f"Can't review annotations of '{eg_view_id}' (in dataset "
f"'{set_id}') and '{global_view_id}' (in previous examples)",
exits=1,
)
# Override view ID if value is set on the command line – this allows
# annotations created with "blocks" to be reviewed as ner_manual, etc.
eg[VIEW_ID_ATTR] = default_view_id or eg_view_id
input_hash = eg[INPUT_HASH_ATTR]
key = (eg[TASK_HASH_ATTR], eg["answer"])
merged[input_hash].setdefault(key, []).append(eg)
n_merged += 1
log(f"RECIPE: Merged {n_merged} examples from {len(datasets)} datasets")
is_manual = global_view_id and global_view_id.endswith(
("_manual", "choice", "relations")
)
stream = ReviewStream(merged, by_input=is_manual, show_skipped=show_skipped)
return get_stream(stream)
@support_both_streams(stream_arg="stream")
def filter_auto_accept_stream(
stream: Iterator[Dict[str, Any]],
db: Database,
dataset: str,
accept_single: bool = False,
) -> StreamType:
"""
Automatically add examples with no conflicts to the database and skip
them during annotation.
"""
task_hashes = db.get_task_hashes(dataset)
for eg in stream:
versions = eg["versions"]
if len(versions) == 1: # no conflicts, only one version
if TASK_HASH_ATTR in eg and eg[TASK_HASH_ATTR] in task_hashes:
continue
sessions = versions[0]["sessions"]
if (len(sessions) > 1) or accept_single:
# Add example to dataset automatically and use the answer from
# one of the tasks (may be "reject" for binary annotations where
# all annotators agree on "reject")
eg["answer"] = versions[0].get("answer", "accept")
db.add_examples([eg], [dataset])
# Don't send anything out for annotation
else:
yield eg
@recipe(
"review",
# fmt: off
dataset=("Dataset to save annotations to", "positional", None, str),
input_sets=("Comma-separated names of datasets to review", "positional", None, split_string),
view_id=("View ID (e.g. 'ner' or 'ner_manual') to use if none present in the task or to overwrite existing", "option", "v", str),
label=("Comma-separated label(s) to annotate or text file with one label per line", "option", "l", get_labels),
fetch_media=("Load images, audio or video files from local paths or URLs", "flag", "FM", bool),
show_skipped=("Include skipped answers, e.g. if annotator hit ignore or rejected manual annotation", "flag", "S", bool),
auto_accept=("Automatically accept annotations with no conflicts and add them to the dataset", "flag", "A", bool),
accept_single=("Also automatically accept examples with only single user annotations", "flag", "AS", bool)
# fmt: on
)
def review(
dataset: str,
input_sets: List[str],
view_id: Optional[str] = None,
label: Optional[List[str]] = None,
fetch_media: bool = False,
show_skipped: bool = False,
auto_accept: bool = False,
accept_single: bool = False,
) -> RecipeSettingsType:
"""Review existing annotations created by multiple annotators and
resolve potential conflicts by creating one final "master annotation". Can
be used for both binary and manual annotations. If the annotations were
created with a manual interface, the "most popular" version will be
pre-selected automatically.
NOTE: If you're using this recipe with annotations created in Prodigy v1.7.1
or lower, you'll need to define a --view-id argument with the annotation
interface ID to use. For example, 'ner_manual' or 'classification'.
"""
log("RECIPE: Starting recipe review", locals())
DB = connect()
for set_id in input_sets:
if set_id not in DB:
msg.fail(f"Can't find input dataset '{set_id}' in database", exits=1)
all_examples = {set_id: DB.get_dataset_examples(set_id) for set_id in input_sets}
stream = get_review_stream(
all_examples, view_id, fetch_media=fetch_media, show_skipped=show_skipped
)
config = {"auto_count_stream": True}
if label:
config["labels"] = label
def before_db(examples: List[TaskType]) -> List[TaskType]:
if fetch_media:
# Remove all data URIs before storing example in the database
keys = ("image", "audio", "video")
for eg in examples:
for key in keys:
if key in eg and eg[key].startswith("data:") and "path" in eg:
eg[key] = eg["path"]
for version in eg.get("versions", []):
for key in keys:
if (
key in version
and version[key].startswith("data:")
and "path" in version
):
version[key] = version["path"]
return examples
if auto_accept:
stream = filter_auto_accept_stream(stream, DB, dataset, accept_single)
return {
"view_id": "blocks",
"dataset": dataset,
"stream": stream,
"before_db": before_db,
"config": {
**config,
"blocks": [
{"view_id": "review"},
{"view_id": "text_input", "field_label": "Notes"}
],
}
}