review recipe stalling with blocks view-id

I have two annotated input tasks, and I want to review them using review.py. When I run the following command, I don't get any errors and Prodigy starts up in my browser. However, it just continuously loads and then stalls.

Command:

PRODIGY_ALLOWED_SESSIONS=cheyanne prodigy review input-name1,input-name2 -F /Users/recipes/review.py

Custom recipe used to annotate the two input sets (intent, utterance, binary correct/incorrect choice field, checkbox for speech-to-text errors, and notes field):

from typing import Dict, Generator

import prodigy
from prodigy.components.loaders import JSONL

from annotator.recipes.utils import VALIDATION

UNLIMITED_ROWS = [
    {"view_id": "html", "html_template":
        "<div style=\"padding: 0 10px; border: 1px solid #ddd; border-radius: 4px; text-align: left;\">" +
        "<label style=\"font-size: 14px; opacity: 0.90; margin-bottom: 10px;\">intent</label>" +
        "<div style=\"max-height: 300px; overflow-y: scroll; margin-bottom: 10px;\">{{final_intent_v3}}" +
        "<br>" +
        "<br>" +
        "<label style=\"font-size: 14px; opacity: 0.90; margin-bottom: 10px;\">utterance</label>" +
        "<div style=\"max-height: 300px; overflow-y: scroll; margin-bottom: 10px;\">{{utterance}}" +
      "</div>"
    },
    {"view_id": "choice"},
    {"view_id": "html", "html_template": "<div style='float:left;'>" +
        "<input name='stt_error' id='stt_error' type='checkbox' value='STT Error' style='margin-right:10px;' data-id='{{utterance}}' onchange='updateSttError()'" +
        "<label onclick='update()'>STT Error</label>"
    },
    {"view_id": "text_input", "field_label": "notes"}
]


def add_options(
        stream, label_field="", choices=VALIDATION
) -> Generator[Dict, None, None]:
    """
    Convert each line in the ``stream`` to a ``task`` with a text and an
    options field

    :param stream: the input stream
    :param label_field: key; defaults to "label"
    :param choices: the different choices
    :yield: a task Dict with text and options
    """
    for line in stream:
        options = [word for word in choices]
        task = {
            "final_intent_v3": line["final_intent_v3"],
            "utterance": line["utterance"],
            "stt_error": False,
            "options": [
                {"id": o, "deployment": o, "prompt": o,
                "text": o} for o in options
            ]
        }

        yield task


@prodigy.recipe(
    "intent-validation",
    dataset=("The dataset to save to", "positional", None, str),
    file_path=("Path to texts", "positional", None, str)
)
def custom_labels(dataset, file_path):
    """
    Annotate the text with labels from the list from the ``label_field`` in
    the input file. Augmented with choices from ``choice_field``.
    """

    blocks = UNLIMITED_ROWS

    stream = JSONL(file_path)
    stream = add_options(stream)  # add options to each task

    javascript = """
      // Set stt_error to false by default
      prodigy.update({ stt_error: false });

      function updateSttError() {
        prodigy.update({ stt_error: document.getElementById('stt_error').checked });
      }

      document.addEventListener('prodigyanswer', (event) => {
        // Reset stt_error to false
        prodigy.update({ stt_error: false });

        document.getElementById('stt_error').checked = false;
      });
    """

    return {
        "dataset": dataset,
        "view_id": "blocks",
        "stream": list(stream),
        "config": {
          "blocks": blocks,
          "javascript": javascript
        }
    }

Custom review.py recipe (I just added a notes field):

# pyright: reportUndefinedVariable=false, reportGeneralTypeIssues=false
import copy
import json
import prodigy
import spacy
from collections import defaultdict
from typing import Any, Dict, Generator, Iterator, Iterable, List, Optional, Tuple, Union

from prodigy.components.db import Database, connect
from prodigy.components.decorators import support_both_streams
from prodigy.components.loaders import JSONL
from prodigy.components.preprocess import add_tokens, fetch_media as fetch_media_preprocessor
from prodigy.components.stream import get_stream
from prodigy.core import recipe
from prodigy.recipes.spans import manual as prodigy_spans_manual
from prodigy.types import RecipeSettingsType, StreamType, TaskType
from prodigy.util import (
    IGNORE_HASH_KEYS,
    INPUT_HASH_ATTR,
    SESSION_ID_ATTR,
    TASK_HASH_ATTR,
    VIEW_ID_ATTR,
    get_labels,
    log,
    msg,
    set_hashes,
    split_string,
)

UNSUPPORTED_VIEW_IDS = ("image_manual", "compare", "diff")
INPUT_KEYS = ("text", "image", "html", "options", "audio", "video")
TASK_KEYS = ("spans", "label", "accept", "audio_spans", "relations")


class ReviewStream:
    def __init__(
        self,
        data: Dict[int, Dict[Tuple[int, str], TaskType]],
        by_input: bool = False,
        show_skipped: bool = False,
    ) -> None:
        """Initialize a review stream. This class mostly exists so we can
        expose a __len__ (to show the total progress) and to separate out some
        of the task-type specific abstractions like by_input.

        data (dict): The merged data: {INPUT_HASH: { (TASK_HASH, answer): task }}.
        by_input (bool): Whether to consider everything with the same input hash
            to be the same task the review. This makes sense for datasets with
            ner_manual annotations on the same text. Different task hashes on
            the same input would then be considered conflicts. If False,
            examples with different task hashes are considered different tasks
            to review and only the answers (accept / reject) are what could be
            considered a conflict. This makes sense for binary annotations
            where a reviewer would only be judging the accept/reject decisions.
        show_skipped (bool): Include answers that would otherwise be skipped,
            including annotations with answer "ignore" and rejected examples
            in manual annotation modes (e.g. manual NER annotation with answer
            "reject").
        """
        self.show_skipped = show_skipped
        if by_input:
            self.data = self.get_data_by_input(data)
        else:
            self.data = self.get_data_by_task(data)

    def __len__(self) -> int:
        return len(self.data)

    def __iter__(self) -> StreamType:
        for eg in self.data:
            yield eg

    def make_eg(self, versions: List[TaskType]) -> TaskType:
        default_version = max(versions, key=lambda v: len(v["sessions"]))
        default_version_idx = versions.index(default_version)
        eg = copy.deepcopy(default_version)
        for i, version in enumerate(versions):
            version["default"] = i == default_version_idx
        eg["versions"] = versions
        eg["view_id"] = eg[VIEW_ID_ATTR]
        return eg

    def get_data_by_input(
        self, data: Dict[int, Dict[Tuple[int, str], TaskType]]
    ) -> List[TaskType]:
        # We're considering everything with the same input hash to
        # be the same task to review (e.g. different spans on same
        # text when highlighted manually). Different task hashes on the same
        # input are treated as conflicts to resolve. Rejected answers are
        # automatically excluded.
        examples = []
        for input_versions in data.values():
            versions = []
            for _, task_versions in input_versions.items():
                if not self.show_skipped:
                    task_versions = [
                        v for v in task_versions if v["answer"] == "accept"
                    ]
                if task_versions:
                    version = copy.deepcopy(task_versions[0])
                    sessions = sorted(
                        set([eg[SESSION_ID_ATTR] for eg in task_versions])
                    )
                    version["sessions"] = sessions
                    versions.append(version)
            if versions:
                examples.append(self.make_eg(versions))
        return examples

    def get_data_by_task(
        self, data: Dict[int, Dict[Tuple[int, str], TaskType]]
    ) -> List[TaskType]:
        # We're only considering everything with the same task hash to be the
        # same task to review and provide only two versions: accept and reject.
        examples = []
        by_task = defaultdict(list)
        for input_versions in data.values():
            for (task_hash, _), task_versions in input_versions.items():
                if task_versions:
                    version = copy.deepcopy(task_versions[0])
                    sessions = sorted(
                        set([eg[SESSION_ID_ATTR] for eg in task_versions])
                    )
                    version["sessions"] = sessions
                    by_task[task_hash].append(version)
        for versions in by_task.values():
            examples.append(self.make_eg(versions))
        return examples


def get_review_stream(
    datasets: Dict[str, List[dict]],
    default_view_id: Optional[str] = None,
    fetch_media: bool = False,
    show_skipped: bool = False,
) -> StreamType:
    merged = defaultdict(dict)
    global_view_id = default_view_id
    n_merged = 0
    for set_id, examples in datasets.items():
        if not show_skipped:
            examples = (eg for eg in examples if eg["answer"] != "ignore")
        if fetch_media:
            # Replace paths and URLs with base64 data
            examples = fetch_media_preprocessor(examples, ["image", "audio", "video"])
        for eg in examples:
            # Rehash example to make sure we're comparing correctly. In this
            # case, we want to consider "options" an input key and "accept" a
            # task key, so we can treat choice examples as by_input. We also
            # want to ignore the answer and key by it separately.
            eg = set_hashes(
                eg,
                overwrite=True,
                input_keys=INPUT_KEYS,
                task_keys=TASK_KEYS,
                ignore=IGNORE_HASH_KEYS,
            )
            # Make sure example has session ID (backwards compatibility)
            session_id = eg.get(SESSION_ID_ATTR, set_id)
            eg[SESSION_ID_ATTR] = session_id if session_id is not None else set_id
            # Make sure example has view ID (backwards compatibility)
            eg_view_id = eg.get(VIEW_ID_ATTR, default_view_id)
            if eg_view_id is None:
                print(eg)  # noqa: T201
                msg.fail(
                    f"No '{VIEW_ID_ATTR}' found in the example",
                    "This is likely because it was created with Prodigy <1.8). "
                    "Please specify a --view-id on the command line. For "
                    "example, 'ner_manual' (if the annotations were created with "
                    "the manual interface), 'classification', 'choice' etc.",
                    exits=1,
                )
            if eg_view_id in UNSUPPORTED_VIEW_IDS:
                msg.fail(
                    f"Reviewing '{eg_view_id}' annotations isn't supported yet",
                    "You can vote for this feature on the forum: https://support.prodi.gy",
                    exits=1,
                )
            if global_view_id is None:
                global_view_id = eg_view_id
            if global_view_id != eg_view_id and not default_view_id:
                msg.fail(
                    "Conflicting view_id values in datasets",
                    f"Can't review annotations of '{eg_view_id}' (in dataset "
                    f"'{set_id}') and '{global_view_id}' (in previous examples)",
                    exits=1,
                )
            # Override view ID if value is set on the command line – this allows
            # annotations created with "blocks" to be reviewed as ner_manual, etc.
            eg[VIEW_ID_ATTR] = default_view_id or eg_view_id
            input_hash = eg[INPUT_HASH_ATTR]
            key = (eg[TASK_HASH_ATTR], eg["answer"])
            merged[input_hash].setdefault(key, []).append(eg)
            n_merged += 1
    log(f"RECIPE: Merged {n_merged} examples from {len(datasets)} datasets")
    is_manual = global_view_id and global_view_id.endswith(
        ("_manual", "choice", "relations")
    )
    stream = ReviewStream(merged, by_input=is_manual, show_skipped=show_skipped)
    return get_stream(stream)


@support_both_streams(stream_arg="stream")
def filter_auto_accept_stream(
    stream: Iterator[Dict[str, Any]],
    db: Database,
    dataset: str,
    accept_single: bool = False,
) -> StreamType:
    """
    Automatically add examples with no conflicts to the database and skip
    them during annotation.
    """
    task_hashes = db.get_task_hashes(dataset)
    for eg in stream:
        versions = eg["versions"]
        if len(versions) == 1:  # no conflicts, only one version
            if TASK_HASH_ATTR in eg and eg[TASK_HASH_ATTR] in task_hashes:
                continue
            sessions = versions[0]["sessions"]
            if (len(sessions) > 1) or accept_single:
                # Add example to dataset automatically and use the answer from
                # one of the tasks (may be "reject" for binary annotations where
                # all annotators agree on "reject")
                eg["answer"] = versions[0].get("answer", "accept")
                db.add_examples([eg], [dataset])
            # Don't send anything out for annotation
        else:
            yield eg


@recipe(
    "review",
    # fmt: off
    dataset=("Dataset to save annotations to", "positional", None, str),
    input_sets=("Comma-separated names of datasets to review", "positional", None, split_string),
    view_id=("View ID (e.g. 'ner' or 'ner_manual') to use if none present in the task or to overwrite existing", "option", "v", str),
    label=("Comma-separated label(s) to annotate or text file with one label per line", "option", "l", get_labels),
    fetch_media=("Load images, audio or video files from local paths or URLs", "flag", "FM", bool),
    show_skipped=("Include skipped answers, e.g. if annotator hit ignore or rejected manual annotation", "flag", "S", bool),
    auto_accept=("Automatically accept annotations with no conflicts and add them to the dataset", "flag", "A", bool),
    accept_single=("Also automatically accept examples with only single user annotations", "flag", "AS", bool)
    # fmt: on
)
def review(
    dataset: str,
    input_sets: List[str],
    view_id: Optional[str] = None,
    label: Optional[List[str]] = None,
    fetch_media: bool = False,
    show_skipped: bool = False,
    auto_accept: bool = False,
    accept_single: bool = False,
) -> RecipeSettingsType:
    """Review existing annotations created by multiple annotators and
    resolve potential conflicts by creating one final "master annotation". Can
    be used for both binary and manual annotations. If the annotations were
    created with a manual interface, the "most popular" version will be
    pre-selected automatically.

    NOTE: If you're using this recipe with annotations created in Prodigy v1.7.1
    or lower, you'll need to define a --view-id argument with the annotation
    interface ID to use. For example, 'ner_manual' or 'classification'.
    """
    log("RECIPE: Starting recipe review", locals())
    DB = connect()
    for set_id in input_sets:
        if set_id not in DB:
            msg.fail(f"Can't find input dataset '{set_id}' in database", exits=1)
    all_examples = {set_id: DB.get_dataset_examples(set_id) for set_id in input_sets}
    stream = get_review_stream(
        all_examples, view_id, fetch_media=fetch_media, show_skipped=show_skipped
    )
    config = {"auto_count_stream": True}
    if label:
        config["labels"] = label

    def before_db(examples: List[TaskType]) -> List[TaskType]:
        if fetch_media:
            # Remove all data URIs before storing example in the database
            keys = ("image", "audio", "video")
            for eg in examples:
                for key in keys:
                    if key in eg and eg[key].startswith("data:") and "path" in eg:
                        eg[key] = eg["path"]
                for version in eg.get("versions", []):
                    for key in keys:
                        if (
                            key in version
                            and version[key].startswith("data:")
                            and "path" in version
                        ):
                            version[key] = version["path"]
        return examples

    if auto_accept:
        stream = filter_auto_accept_stream(stream, DB, dataset, accept_single)

    return {
        "view_id": "blocks",
        "dataset": dataset,
        "stream": stream,
        "before_db": before_db,
        "config": {
            **config,
            "blocks": [
                {"view_id": "review"},
                {"view_id": "text_input", "field_label": "Notes"}
            ],
        }
    }

I wanted to add that when I added a view-id to the original command (tried ner, ner_manual, and spans_manual), I did see output in the UI, but it just displayed the task name and annotator name. I wasn't able to get a display with answers from each annotator that included their selection (correct, incorrect). Does review not work work with blocks and binary classification?

Hey @cheyanneb ,

That's right, the built-in review recipe won't work with blocks interface. review is meant really to work with one view_id at a time. Another reason is thatblocks may contain very custom content and it's hard to know upfront how to render the diff.

The --view_id parameter that you've experimented with is a workaround for this limitation. It let's you specify which view_id from your blocks should be selected for review. Would doing one review per view_id be a solution in your case?

The binary classification should be supported, with classification view_id.