Custom behavior for Stream/Loader

Hello everyone! :grin:

I am building a custom annotation interface for which I previously asked for help in this forum.

My dataset contains a large amount of categories, so I’m trying to only show each “category” until the annotator gives it one accept , and then stop showing any further examples that share that same category value.

What I expect:

  • As soon as an annotator clicks accept on any example with category="abc123", all further examples where category=="abc123" are filtered out.
  • If there is a possibility, I would like to customize even more this behavior, i.e. allowing annotating until a ratio there are 3 samples with this setting: 1 accept and 2 reject.
  • It would also be interesting to allow annotation until a ratio is met: for a given category, display samples until 20% of them are annotated; after that, stop displaying this category.

Is there a way to implement this behavior into my recipe?

Any help would be hugely appreciated! Thanks in advance.

Hi @miguelclaramunt!

Such dynamic filtering of tasks should defnitely be possible to implement via a custom task router.

I'm assuming that the conditions you describe:

  • As soon as an annotator clicks accept on any example with category="abc123", all further examples where category=="abc123" are filtered out.
  • If there is a possibility, I would like to customize even more this behavior, i.e. allowing annotating until a ratio there are 3 samples with this setting: 1 accept and 2 reject.
  • It would also be interesting to allow annotation until a ratio is met: for a given category, display samples until 20% of them are annotated; after that, stop displaying this category.

apply globally i.e. across all annotators (not "within" a single annotator session).
If that's the case, you'll need to keep the counts of the number of annotations per category. This can be either done by querying the DB or in-memory.
I recommend initializing the state of such counter from the DB when Prodigy loads and keeping it memory for the lifetime of the server. This preserves the state across the server sessions (because we initialize from the DB) and removes the need to make frequent (and complex) DB queries (I would consider them complex as we would need to extract the category from the task blob unless you want to change the DB schema).
So one possible solution would be to:

  1. leverage on_load Prodigy callback to populate the category counts dictionary
  2. leverage update Prodigy callback to update the global counts everytime the batch is saved.
  3. implement a custom task router that would apply the session selection logic e.g. full-overlap and add the filtering based on your condition on top of it.
    The following example implementation assumes the category is saved under the meta field and also that there is category_total_task information available in each task (for percent calculation). It could also be saved as a recipe global variable or otherwise.
import prodigy
from prodigy.components.stream import get_stream
from prodigy.components.preprocess import add_labels_to_stream
from prodigy.util import set_hashes
from collections import defaultdict
from typing import List, Dict, Iterator
import spacy
import threading

# global state for category stats
# This dictionary will hold the aggregated stats for each category across all annotations.
# Key: category value (e.g., "abc123")
# Value: {'accept': int, 'reject': int, 'ignore': int, 'total': int}
category_annotation_counts = defaultdict(lambda: {'accept': 0, 'reject': 0, 'ignore': 0, 'total': 0})

# A lock to ensure thread-safe access to the global stats dictionary
stats_lock = threading.Lock()

@prodigy.recipe(
    "category_filter.recipe",
    dataset=("The dataset to use", "positional", None, str),
    source=("The source data as a JSONL file", "positional", None, str),
)
def category_filter_recipe(
    dataset: str,
    source: str,
):
    labels = ["LABEL"]
    nlp = spacy.blank("en")
    stream = get_stream(source)
    stream.apply(add_labels_to_stream, stream=stream, labels=labels)

    # ´on_load callback: initialize global category stats from DB
    def on_load_callback(ctrl: "Controller"):
        """
        Called when the Prodigy recipe starts. Loads existing annotations
        from the dataset and populates the global category_annotation_counts.
        """
        prodigy.log("RECIPE: Loading initial category stats from the database...")
        with stats_lock:
            category_annotation_counts.clear()
            # Fetch all annotations from the specified dataset
            # Note: This might be slow for extremely large datasets on load.
            all_dataset_annotations = ctrl.db.get_dataset_examples(dataset)
            for eg in all_dataset_annotations:
                category = eg.get("meta",{}).get("category")
                answer = eg.get("answer")
                if category and answer:
                    category_annotation_counts[category]['total'] += 1
                    if answer == "accept":
                        category_annotation_counts[category]['accept'] += 1
                    elif answer == "reject":
                        category_annotation_counts[category]['reject'] += 1
                    elif answer == "ignore":
                        category_annotation_counts[category]['ignore'] += 1
            prodigy.log(f"RECIPE: Initial category stats loaded: {dict(category_annotation_counts)}")

    # ´update´ callback: update global category stats when a batch is sumbitted
    def update_stats_callback(annotations: List[Dict]):
        """
        Called when annotators submit their annotations. Updates the global
        category_annotation_counts for the newly annotated tasks.
        """
        with stats_lock:
            for eg in annotations:
                category = eg.get("meta",{}).get("category")
                answer = eg.get("answer")
                if category and answer:
                    category_annotation_counts[category]['total'] += 1
                    if answer == "accept":
                        category_annotation_counts[category]['accept'] += 1
                    elif answer == "reject":
                        category_annotation_counts[category]['reject'] += 1
                    elif answer == "ignore":
                        category_annotation_counts[category]['ignore'] += 1
            prodigy.log(f"RECIPE: Stats updated: {dict(category_annotation_counts)}")

    # custom task router to apply the filtering logic
    def custom_task_router(ctrl: "Controller", session_id: str, item: Dict) -> List[str]:
        """
        Determines which sessions receive the current task based on category stats.
        """
        category = item.get("meta",{}).get("category")
        if not category:
            # If a task doesn't have a 'category', send it to all active sessions
            prodigy.log(f"ROUTER: Task {item.get('_task_hash')} has no category, routing to all sessions.")
            return ctrl.session_ids

        with stats_lock: # Ensure thread-safe access to the global stats
            current_category_stats = category_annotation_counts[category]
            # 1. Condition: Stop showing after one accept
            if current_category_stats['accept'] >= 1:
                prodigy.log(f"ROUTER: Filtering out category '{category}' with task hash {item.get("_task_hash")}: one accept met.")
                return [] # Filter out this task for ALL annotators

            # 2. Condition: Stop after 1 accept AND 2 rejects for this category
            # This is a cumulative condition across all annotations for this category.
            if current_category_stats['accept'] >= 1 and current_category_stats['reject'] >= 2:
                prodigy.log(f"Filtering out category '{category}' with task hash {item.get("_task_hash")}: 1 accept and 2 rejects met.")
                return []

            # 3. Condition: Stop when 20% of samples for this category are annotated
            # This requires knowing the total number of tasks for this specific category upfront.
            # You should include 'category_total_tasks' in your input JSONL data for each task:
            # {"text": "...", "meta": {"category": "abc123", "category_total_tasks": 100}}
            total_for_category = item.get("meta",{}).get("category_total_tasks")
            if total_for_category is not None and total_for_category > 0:
                current_annotated_percentage = current_category_stats['total'] / total_for_category
                if current_annotated_percentage >= 0.20:
                    prodigy.log(f"ROUTER: Filtering out category '{category}'with task hash {item.get("_task_hash")}: 20% ({current_annotated_percentage:.2f}) annotated.")
                    return []

        return ctrl.session_ids

    return {
        "dataset": dataset,
        "view_id": "classification",
        "stream": stream,
        "on_load": on_load_callback,          # Register the on_load callback
        "update": update_stats_callback,      # Register the update callback
        "task_router": custom_task_router,    # Register your custom task router
        "config": {
            "labels": labels,
            "batch_size": 3,
        }
    }
´´´

I also recommend checking out our docs on [task routing](https://prodi.gy/docs/task-routing) to fully understand what other options are available. It is also worth checking the implementation of the Prodigy built-in routers (you can find them under components/routers.py in prodigy src code). I adapted the `full-overlap` router for the example solution but of course you should be able to integrate the filters to any other custom routing function.
1 Like