Inconsistency Number of Annotated Data

Hi all,

I am doing NER and text classification annotation using a custom recipe for multi-annotators using a named session. The annotation process runs well, but in the end, there is inconsistency in the number of data. The size of my data is 740. In one annotator, the "no task available" message appears when the annotator finishes 727 annotations. In the other annotator, the "no task available" message appears when the annotator finishes 734 annotations.

How to make sure that both of the annotators can annotate all of the data (740)? How to check it using a Python script to detect which data has not been annotated yet at least by one annotator?

Hi @sigitpurnomo ,

Do you have any setting related to task overlap between the annotators such as feed_overlap, annotations_per_task or work_stealing? Do you define your named sessions up front with PRODIGY_ALLOWED_SESSIONS?

In principle, to make sure all annotators annotate all examples you should set feed_overlap: true in your prodigy.json. That's it.

Now, in order to check if there are any examples with fewer than expected annotations in the DB you can use the following python script:

from prodigy.components.db import connect
import json

def find_missing_annotations(dataset_name):
    db = connect()
    annotations = db.get_dataset_examples(dataset_name)
    
    # Group by task hash and count annotators
    task_counts = {}
    for ann in annotations:
        task_hash = ann['_task_hash']
        task_counts[task_hash] = task_counts.get(task_hash, 0) + 1

    # Find tasks with < 2 annotations
    incomplete = {hash: count for hash, count in task_counts.items() if count < 2}
    
    # Get original tasks for these hashes
    missing_tasks = [ann for ann in annotations if ann['_task_hash'] in incomplete]
    
    print(f"Found {len(incomplete)} tasks with incomplete annotation coverage")
    return missing_tasks

# Usage
missing = find_missing_annotations('datasetname')

Or if it's easier you can get it directly on CLI with the jq tool:

python -m prodigy db-out datasetname | jq -s 'group_by(._task_hash) | map(select(length < 2)) | length'

And this is to get number of annotations per annotator:

prodigy db-out datasetname | jq -s 'group_by(._annotator_id) | map({annotator: .[0]._annotator_id, count: length})'

Hi @magdaaniol

Thank you for your response.

I have set the feed_overlap to true in my prodigy.json file. The annotations_per_task is null, and the allow_work_stealing is false. I also defined the named sessions using the PRODIGY_ALLOWED_SESSIONS.

I have tried your script, and the result is Found 7 tasks with incomplete annotation coverage. I suggest the result is from the second annotator, that annotated 734 data with 1 data is ignored. But how about the first annotator that annotated just 727 data? How can the incomplete task be displayed again in the interface to annotate?

Hi @sigitpurnomo,

It's hard to say what exactly happened without reproducing the situation/analyzing the logs.
With feed_overlap both annotators should annotate exactly the same number of examples. The fact that they ended up with different totals suggests that some work stealing happened, however you say you it was set to false (it's true by default). The reason why the annotated total is different from the expected 740 might be that there were some duplicates in the input dataset that were ignored. Both cases would be logged with PRODIGY_LOGGING set to verbose, but you'd have to reproduce the sessions with this logging level so that we can get to the explanation. I certainly do not observe anything incorrect with your settings and an example dataset. Which Prodigy version are you running?

You should be able to restart the server with the same input dataset, the same output dataset and same annotator-related settings to resume the annotation of the missing examples.
Let's first find out which annotator did not annotate the missing examples:

# Usage
missing = find_missing_annotations('datasetname')
# check which annotators annotated the examples with 1 annotation to know who should annotate the missing examples. For that will reuse the Python script I shared previously:
for annot in missing:
    print(annot["_task_hash"], annot["_annotator_id"])

This prints the annotations that have only one annotation and their annotator. Is there only one annotator mentioned? If that's the case, you should be able to spin up the Prodigy with the same input and output datasets and ask the other annotator to access it with their named session - they should be served the missing questions.
Let me know if that's not the case.

Finally, to quickly check if there are any duplicates in your input file, you can run the following script:

from prodigy.components.stream import get_stream

input_stream = get_stream("input.jsonl", dedup=False)
input_counts = {}
for eg in input_stream:
    input_hash = eg['_input_hash']
    input_counts[input_hash] = task_counts.get(input_hash, 0) + 1

duplicates = [hash for hash, count in input_counts.items() if count > 1]
print(f"Found {len(duplicates)} duplicates.")

Hi @magdaaniol

My dataset is from student peer review data, so it can contain the same text content (duplicate content), for example: "The report is well written."
Is the set_hash function in Prodigy based on the text content?

I ran the script to print the annotations with only one annotation and their annotator, showing that only one annotator was mentioned (the annotator who annotated 734 data). But, when I run Prodigy with the same input and output datasets and access it with the other annotator named session, who only annotated 727 data, the UI still displays a "No task available" message.

For your information, both of the script outputs you suggest are 7. That is the difference in annotation number between both annotators (734 and 727).

My dataset is from student peer review data, so it can contain the same text content (duplicate content), for example: "The report is well written."
Is the set_hash function in Prodigy based on the text content?

I will assume then that the reason why neither of the annotator got to 740 is duplicate filtering.
The set_hash function sets both input_hash and task_hash. input_hash is, by default, based on the input text and task_hash is computed based on task-relevant keys. Prodigy in most cases excludes incoming examples by input_hash, which is why examples with the same input text would be skipped.
You can find out more details about how hashing works in Prodigy here: Loaders and Input Data · Prodigy · An annotation tool for AI, Machine Learning & NLP

As for why there's a difference in totals between the two annotators. Could you re-do the following:

But, when I run Prodigy with the same input and output datasets and access it with the other annotator named session, who only annotated 727 data, the UI still displays a "No task available" message.

but with PRODIGY_LOGGING=verbose in front of your command and share the terminal output, please (make sure to obfuscate any sensitive information). We need to see why these remaining examples are being rejected.

Does the "No tasks available" screen immediately when you access the server?
Could you also share which Prodigy version and recipe are you running and whether you have modified the "exclude_by" config setting?

Finally, let's do some analysis of input hash sets in the current state. Could you run this script and send me the output please?

from prodigy.components.db import connect
from prodigy.components.stream import get_stream
import json
from collections import Counter


def analyze_annotation_discrepancy(input_filename, dataset_name_1, dataset_name_2):
    print("\n=== Prodigy Annotation Analysis ===\n")
    print(f"Input file: {input_filename}")
    print(f"Dataset 1: {dataset_name_1}")
    print(f"Dataset 2: {dataset_name_2}\n")

    # Get input stream data
    input_stream = get_stream(input_filename, dedup=False)
    input_stream_input_hashes = [eg["_input_hash"] for eg in input_stream]
    
    print("=== Input Stream Statistics ===")
    print(f"Total input hashes: {len(input_stream_input_hashes)}")
    print(f"Unique input hashes: {len(set(input_stream_input_hashes))}")
    input_hash_counts = Counter(input_stream_input_hashes)
    duplicates = {hash_: count for hash_, count in input_hash_counts.items() if count > 1}
    if duplicates:
        print(f"Note: Input stream contains {len(duplicates)} duplicate hashes")
        for hash_, count in duplicates.items():
            print(f"Hash: {hash_} appears {count} times")
    print()

    # Connect to database
    db = connect()
    
    # Analyze both datasets
    datasets_info = {}
    for dataset_name in [dataset_name_1, dataset_name_2]:
        print(f"=== Analysis for {dataset_name} ===")
        
        # Get basic counts
        input_hashes = db.get_input_hashes(dataset_name)
        task_hashes = db.get_task_hashes(dataset_name)
        
        datasets_info[dataset_name] = {
            'input_hashes': set(input_hashes),
            'task_hashes': set(task_hashes),
            'counts': {
                'input_hashes': len(input_hashes),
                'unique_input_hashes': len(set(input_hashes)),
                'task_hashes': len(task_hashes),
                'unique_task_hashes': len(set(task_hashes)),
            }
        }
        
        # Print statistics
        print(f"Input hashes: {len(input_hashes)}")
        print(f"Unique input hashes: {len(set(input_hashes))}")
        print(f"Task hashes: {len(task_hashes)}")
        print(f"Unique task hashes: {len(set(task_hashes))}")
        
        # Check for duplicates
        if len(input_hashes) != len(set(input_hashes)):
            print(f"Warning: Contains {len(input_hashes) - len(set(input_hashes))} duplicate input hashes")
        if len(task_hashes) != len(set(task_hashes)):
            print(f"Warning: Contains {len(task_hashes) - len(set(task_hashes))} duplicate task hashes")
        print()

    # Comparison analysis
    print("=== Comparison Analysis ===")
    
    # Compare with input stream
    for dataset_name in [dataset_name_1, dataset_name_2]:
        missing_from_dataset = set(input_stream_input_hashes) - datasets_info[dataset_name]['input_hashes']
        extra_in_dataset = datasets_info[dataset_name]['input_hashes'] - set(input_stream_input_hashes)
        
        print(f"\nComparing {dataset_name} with input stream:")
        print(f"Missing from dataset: {len(missing_from_dataset)} hashes")
        print(f"Extra in dataset: {len(extra_in_dataset)} hashes")
        
        if missing_from_dataset:
            print("\nMissing hashes:")
            for hash_value in list(missing_from_dataset):
                print(f"- {hash_value}")
    
    # Compare between datasets
    diff_1_2 = datasets_info[dataset_name_1]['input_hashes'] - datasets_info[dataset_name_2]['input_hashes']
    diff_2_1 = datasets_info[dataset_name_2]['input_hashes'] - datasets_info[dataset_name_1]['input_hashes']
    
    print(f"\nComparing {dataset_name_1} vs {dataset_name_2}:")
    print(f"In dataset 1 but not in 2: {len(diff_1_2)} hashes")
    print(f"In dataset 2 but not in 1: {len(diff_2_1)} hashes")
    

if __name__ == "__main__":
    analyze_annotation_discrepancy(
        input_filename="input.jsonl",  # replace with your input filename
        dataset_name_1="dataset_name-session1",  # replace with annotator 1 dataset
        dataset_name_2="dataset_name-session2"   # replace with annotator 2 dataset
    )

The session datasets are created using the following pattern: "main_dataset_name-named_session"

Of course the workaround here would be to filter out the missing examples and save them in a seprate jsonl file. Then, run a separate Prodigy sessions with this 7 examples as input and the right annotator. It would be good to understand what's going on though, as, like I said, if annotator who missed has fewer examples have not annotated them previously they should be able to see then in the UI when you re-start Prodigy with your current setup.

Hi @magdaaniol,

Here, the output of the Prodigy runs with the PRODIGY_LOGGING=verbose setting. I have trimmed some text because the text is too long (note: I cannot attach a text file in this reply).

12:01:55: CLI: Importing file …/prodigy/recipe.py
Using 6 label(s): Appreciation, Problem, Suggestion, Neutral, Personal Comment,
Off Topic
Using 6 label(s): Appreciation, Problem, Suggestion, Neutral, Personal Comment,
Off Topic
12:01:56: RECIPE: Calling recipe 'ner.withtextcat'
12:01:58:  .prodigy/prodigy.json
12:01:58: VALIDATE: Validating components returned by recipe
12:01:58: CONTROLLER: Initialising from recipe
12:01:58: CONTROLLER: Recipe Config
12:01:58: {'lang': 'id', 'labels': ['Appreciation', 'Problem', 'Suggestion', 'Neutral', 'Personal Comment', 'Off Topic'], 'choice_style': 'multiple', 'blocks': [{'view_id': 'ner_manual'}, {'view_id': 'choice', 'text': None}], 'dataset': 'peer-review-masdig-v2', 'recipe_name': 'ner.withtextcat', 'theme': 'spacy', 'custom_theme': {'labels': {'Appreciation': '#7fffd4', 'Problem': '#9932cc', 'Suggestion': '#ff00ff', 'Neutral': '#00ff7f', 'Personal Comment': '#ff6347', 'Off Topic': '#00bfff'}}, 'buttons': ['accept', 'ignore', 'undo'], 'batch_size': 5, 'history_size': 5, 'port': 8080, 'host': '192.168.15.18', 'cors': True, 'db': 'sqlite', 'db_settings': {}, 'validate': True, 'auto_exclude_current': True, 'instant_submit': False, 'feed_overlap': True, 'annotations_per_task': None, 'allow_work_stealing': False, 'total_examples_target': 0, 'ui_lang': 'en', 'project_info': ['dataset', 'session', 'lang', 'recipe_name', 'view_id', 'label'], 'show_stats': True, 'hide_meta': False, 'show_flag': True, 'instructions': False, 'swipe': False, 'swipe_gestures': {'left': 'accept', 'right': 'reject'}, 'split_sents_threshold': False, 'html_template': False, 'global_css': None, 'global_css_dir': None, 'javascript': None, 'javascript_dir': None, 'writing_dir': 'ltr', 'show_whitespace': False}
12:01:58: VALIDATE: Creating validator for view ID 'blocks'
12:01:58: CONTROLLER: Using `full_overlap` router.
12:01:58: VALIDATE: Validating Prodigy and recipe config
12:01:58: PREPROCESS: Tokenizing examples (running tokenizer only)
12:01:58: .prodigy/prodigy.json
12:01:58: DB: Creating unstructured dataset '2024-11-26_12-01-58'
12:01:58: {'created': datetime.datetime(2024, 11, 11, 6, 17, 18)}
12:01:58: CORS: initialized with wildcard "*" CORS origins
Starting the web server at http://192.168.15.18:8080 ...
Open the app in your browser and start annotating!
INFO:     Started server process [53466]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://192.168.15.18:8080 (Press CTRL+C to quit)
INFO:     192.168.15.59:26340 - "GET /?session=ysp HTTP/1.1" 200 OK
INFO:     192.168.15.59:26340 - "GET /bundle.js HTTP/1.1" 200 OK
12:03:54: .prodigy/prodigy.json
INFO:     192.168.15.59:26350 - "GET /userinfo HTTP/1.1" 404 Not Found
INFO:     192.168.15.59:26340 - "GET /project/ysp HTTP/1.1" 200 OK
INFO:     192.168.15.59:26340 - "GET /fonts/robotocondensed-bold.woff2 HTTP/1.1" 200 OK
INFO:     192.168.15.59:26350 - "GET /fonts/lato-regular.woff2 HTTP/1.1" 200 OK
INFO:     192.168.15.59:26360 - "GET /fonts/lato-bold.woff2 HTTP/1.1" 200 OK
INFO:     192.168.15.59:26340 - "GET /favicon.ico HTTP/1.1" 200 OK
12:03:54: .prodigy/prodigy.json
12:03:54: POST: /get_session_questions
12:03:54: CONTROLLER: Getting batch of questions for session: peer-review-masdig-v2-ysp
12:03:54: STREAM: Created queue for peer-review-masdig-v2-ysp.
12:03:54: ROUTER: Routing item with _task_hash=1422345350 -> ['peer-review-masdig-v2-ysp']
...
12:03:55: ROUTER: Routing item with _task_hash=-1709185225 -> ['peer-review-masdig-v2-ysp']
12:03:55: RESPONSE: /get_session_questions (0 examples)
12:03:55: {'tasks': [], 'total': 727, 'progress': None, 'session_id': 'peer-review-masdig-v2-ysp'}
INFO:     192.168.15.59:26340 - "POST /get_session_questions HTTP/1.1" 200 OK
INFO:     192.168.15.59:5628 - "GET /?session=ysp HTTP/1.1" 200 OK
INFO:     192.168.15.59:5628 - "GET /bundle.js HTTP/1.1" 200 OK
12:04:09: .prodigy/prodigy.json
INFO:     192.168.15.59:5628 - "GET /project/ysp HTTP/1.1" 200 OK
INFO:     192.168.15.59:5628 - "GET /userinfo HTTP/1.1" 404 Not Found
INFO:     192.168.15.59:5628 - "GET /fonts/robotocondensed-bold.woff2 HTTP/1.1" 200 OK
INFO:     192.168.15.59:5642 - "GET /fonts/lato-regular.woff2 HTTP/1.1" 200 OK
INFO:     192.168.15.59:5658 - "GET /fonts/lato-bold.woff2 HTTP/1.1" 200 OK
12:04:09: .prodigy/prodigy.json
12:04:09: POST: /get_session_questions
12:04:09: CONTROLLER: Getting batch of questions for session: peer-review-masdig-v2-ysp
12:04:09: RESPONSE: /get_session_questions (0 examples)
12:04:09: {'tasks': [], 'total': 727, 'progress': None, 'session_id': 'peer-review-masdig-v2-ysp'}
INFO:     192.168.15.59:5628 - "POST /get_session_questions HTTP/1.1" 200 OK

The "No tasks available" screen immediately appears when I access the server. I use the Prodigy 1.16.0 version. Here is the custom recipe that I used:

import prodigy
from prodigy.components.loaders import JSONL
from prodigy.components.preprocess import add_tokens
from prodigy.util import split_string, set_hashes
from prodigy.util import get_labels
import spacy
import copy
from typing import List, Optional


def make_tasks(nlp, stream, labels):
    """Add a 'spans' key to each example, with predicted entities."""
    # Process the stream using spaCy's nlp.pipe, which yields doc objects.
    # If as_tuples=True is set, you can pass in (text, context) tuples.
    texts = ((eg["text"], eg) for eg in stream)
    for doc, eg in nlp.pipe(texts, as_tuples=True):
        task = copy.deepcopy(eg)
        spans = []
        for ent in doc.ents:
            # Continue if predicted entity is not selected in labels
            if labels and ent.label_ not in labels:
                continue
            # Create span dict for the predicted entitiy
            spans.append(
                {
                    "token_start": ent.start,
                    "token_end": ent.end - 1,
                    "start": ent.start_char,
                    "end": ent.end_char,
                    "text": ent.text,
                    "label": ent.label_,
                }
            )
        task["spans"] = spans
        # Rehash the newly created task so that hashes reflect added data
        task = set_hashes(task)
        yield task


def add_options(stream, options):
    """Helper function to add options to every task in a stream."""
    options = [{"id": option, "text": option} for option in options]
    for task in stream:
        task["options"] = options
        yield task

# Recipe decorator with argument annotations: (description, argument type,
# shortcut, type / converter function called on value before it's passed to
# the function). Descriptions are also shown when typing --help.
@prodigy.recipe(
    "ner.withtextcat",
    dataset=("The dataset to use", "positional", None, str),
    spacy_model=("The base model", "positional", None, str),
    source=("The source data as a JSONL file", "positional", None, str),
    labelner=("One or more comma-separated labels", "option", "l", get_labels),
    labeltextcat=("One or more comma-separated labels for text classficiation", "option", "ltextcat", get_labels),
    exclude=("Names of datasets to exclude", "option", "e", split_string),
)
def ner_withtextcat(
    dataset: str,
    spacy_model: str,
    source: str,
    labelner: Optional[List[str]] = None,
    labeltextcat: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
):
    """
    Create gold-standard data by correcting a model's predictions manually.
    """
    # Load the spaCy model
    nlp = spacy.load(spacy_model)

    # Load the stream from a JSONL file and return a generator that yields a
    # dictionary for each example in the data.
    stream = JSONL(source)

    # Tokenize the incoming examples and add a "tokens" property to each
    # example. Also handles pre-defined selected spans. Tokenization allows
    # faster highlighting, because the selection can "snap" to token boundaries.
    stream = add_tokens(nlp, stream)

    # Add the entities predicted by the model to the tasks in the stream
    stream = make_tasks(nlp, stream, labelner)

    stream = add_options(stream, labeltextcat)

    return {
        "view_id": "blocks",  # Annotation interface to use
        "dataset": dataset,  # Name of dataset to save annotations
        "stream": stream,  # Incoming stream of examples
        "exclude": exclude,  # List of dataset names to exclude
        "config": {  # Additional config settings, mostly for app UI
            "lang": nlp.lang,
            "labels": labelner,  
            "choice_style": "multiple",
            "blocks": [
                {"view_id": "ner_manual"},
                {"view_id": "choice", "text": None}
            ]
        }
    }

Lastly, here is the output from your script:

=== Prodigy Annotation Analysis ===

Input file: …/peer-review/dataset/peer_review_dataset.jsonl
Dataset 1: peer-review-masdig-v2-ysp
Dataset 2: peer-review-masdig-v2-dev

⚠ Prodigy automatically assigned an input/task hash because it was
missing. This automatic hashing will be deprecated as of Prodigy v2 because it
can lead to unwanted duplicates in custom recipes if the examples deviate from
the default assumptions. More information can found on the docs:
https://prodi.gy/docs/api-components#set_hashes
=== Input Stream Statistics ===
Total input hashes: 740
Unique input hashes: 727
Note: Input stream contains 7 duplicate hashes
Hash: 628546903 appears 2 times
Hash: -927966096 appears 2 times
Hash: -2039616366 appears 8 times
Hash: -1373306777 appears 2 times
Hash: 703580025 appears 2 times
Hash: -1794876750 appears 2 times
Hash: -1821170728 appears 2 times

=== Analysis for peer-review-masdig-v2-ysp ===
Input hashes: 727
Unique input hashes: 727
Task hashes: 727
Unique task hashes: 727

=== Analysis for peer-review-masdig-v2-dev ===
Input hashes: 734
Unique input hashes: 734
Task hashes: 734
Unique task hashes: 734

=== Comparison Analysis ===

Comparing peer-review-masdig-v2-ysp with input stream:
Missing from dataset: 0 hashes
Extra in dataset: 0 hashes

Comparing peer-review-masdig-v2-dev with input stream:
Missing from dataset: 0 hashes
Extra in dataset: 7 hashes

Comparing peer-review-masdig-v2-ysp vs peer-review-masdig-v2-dev:
In dataset 1 but not in 2: 0 hashes
In dataset 2 but not in 1: 7 hashes

I hope this information can help us investigate this problem, and I can reconfigure it again so the annotator can annotate the 7 tasks left.

Thank you for your help and assistance. Really appreciated it.

Hi @sigitpurnomo,

Thanks so much for providing this extra information. Now we're closer :sweat_smile:
From the analysis of input hashes it looks like there are 727 unique inputs in your dataset:

=== Input Stream Statistics ===
Total input hashes: 740
Unique input hashes: 727
Note: Input stream contains 7 duplicate hashes
Hash: 628546903 appears 2 times
Hash: -927966096 appears 2 times
Hash: -2039616366 appears 8 times
Hash: -1373306777 appears 2 times
Hash: 703580025 appears 2 times
Hash: -1794876750 appears 2 times
Hash: -1821170728 appears 2 times

Since the total number of annotations for peer-review-masdig-v2-ysp is 727, it means that this annotator annotated all the examples. That's the reason you're getting "No tasks available" when trying to feed the same input dataset to this session.

The question is why peer-review-masdig-v2-dev has as many as 734, while what we actually expect is 727.
The 7 extra inputs are not even duplicates, they are not present in the input dataset:

Comparing peer-review-masdig-v2-dev with input stream:
Missing from dataset: 0 hashes
Extra in dataset: 7 hashes

Could it be that they come from another session which used the same target dataset peer-review-masdig-v2-dev but a different input dataset or the same target dataset, the same input dataset but a different input hashing function (less likely I suppose)?

In any case, the conclusion is that both annotators annotated all unique (727) examples (wrt the input hash), but peer-review-masdig-v2-dev has 7 extra examples that do not come from peer_review_dataset.jsonl. Does it make sense?

Hi @magdaaniol

Is it possible that the extra examples come from the "new" input because some texts were edited in the middle of the annotation? For example, text can not be annotated for the NER because some words are not separated by space. When this happened, we stopped the Prodigy, edited the text input by adding space, and reran the Prodigy to continue the annotation.

The following questions are:

  1. Is it possible to make the duplicate text appear for the annotation?
  2. How do we remove the extra annotations from the dataset (database) so the inter-agreement annotations will be accurately calculated using the prodigy metric.iaaa?

Hi @sigitpurnomo,

Yes, the input_hash is computed from the string value of the input text so any changes to the string will result in a different input_hash value. And if the same target dataset was used before stopping the server and the original example was saved in the DB - that would result in dataset containing examples that are not in the current input file.

We can definitely filter out these examples from the final dataset via python script, but for the future, perhaps a more efficient workflow would be to instruct the annotators to ignore (by hitting the ignore button) examples that need editing and postprocess them in a single pass i.e.

  1. once the annotation is done, filter out the ignored examples
  2. edit them according to your needs and save in a separate jsonl file
  3. set up another annotation session just with these edited examples

On to the remaining questions:

  1. Is it possible to make the duplicate text appear for the annotation?

Are you asking whether it's possible to make an annotator annotate exactly the same question more than once?
For that you'd need to set dedup to False in the get_stream function. Currently, your recipe is using the legacy JSONL loader. A recommended way to load the source is via get_stream function.
That would prevent the removal of duplicates from the input stream.
Then, another place where filtering happens is within each session. Whenever a session asks for examples, the candidate examples from the stream will be checked against the examples already annotated by this session based on the input or task hash (depending on the exclude_by config setting).
So in your case, if the examples are totally identical i.e. they would have the same input hash and the same task hash - you would need to differentiate between them using a custom hashing function that takes into account some attribute that differentiates between the examples e.g. a custom field in the input file e.g. "copy": 1. This new field should be used together with text to compute the input hash.
If, however, by duplicates you mean the same input text but different question about it e.g. different options in the choice block - then you can set exclude_by: task in your .prodigy.json - in fact, this is the default setting so you shouldn't have to change anything unless it's set to input in your current setup.
I wonder what's the purpose of sending duplicates - is it computing intra-annotator agreement as opposed to (or on top of) inter-annotator agreement?

  1. How do we remove the extra annotations from the dataset (database)

You'd need to filter out your current target dataset (both main and session datasets) and remove the examples that are not present in the current input dataset. Here's the example script that could do that. It saves the copies of edited datasets as new jsonl files in current working directory:

import sys
from typing import List, Set

import srsly

from prodigy.components.db import connect
from prodigy.components.stream import get_stream
from prodigy.types import StreamType


def filter_out_examples(stream: StreamType, hashes: Set[int]) -> StreamType:
    """Filter out examples with specific hashes from the stream."""
    for example in stream:
        if example["_input_hash"] not in hashes:
            yield example


def get_stream_hashes(db, dataset_name: str) -> List[int]:
    """Get hashes from a dataset."""
    return db.get_input_hashes(dataset_name)


def save_dataset(stream: StreamType, dataset_name: str) -> None:
    """Save a dataset stream to a JSONL file."""
    output_filename = f"{dataset_name}_edited.jsonl"
    srsly.write_jsonl(output_filename, stream)
    print(f"Saved an updated copy of {dataset_name} as {output_filename}")


def get_extra_hashes(
    input_hashes: List[int], session_hashes: List[List[int]]
) -> Set[int]:
    """Get hashes that are in session data but not in input."""
    extra_hashes = set()
    for session_hash_list in session_hashes:
        extra_hashes.update(set(session_hash_list) - set(input_hashes))
    return extra_hashes


def clean_datasets(
    input_filename: str,
    main_dataset: str,
    annotator1_dataset: str,
    annotator2_dataset: str,
) -> None:
    """Clean the datasets by removing extra examples."""
    print("Processing datasets:")
    print(f"Input file: {input_filename}")
    print(f"Main dataset: {main_dataset}")
    print(f"Annotator 1 dataset: {annotator1_dataset}")
    print(f"Annotator 2 dataset: {annotator2_dataset}")

    # Connect to database
    db = connect()

    # Get input stream and hashes
    input_stream = get_stream(input_filename, dedup=False)
    input_hashes = [ex["_input_hash"] for ex in input_stream]

    # Get dataset streams
    streams = {
        "main": get_stream(f"dataset:{main_dataset}"),
        "annotator1": get_stream(f"dataset:{annotator1_dataset}"),
        "annotator2": get_stream(f"dataset:{annotator2_dataset}"),
    }

    # Get session hashes
    session_hashes = [
        get_stream_hashes(db, annotator1_dataset),
        get_stream_hashes(db, annotator2_dataset),
    ]

    # Find extra examples
    extra_hashes = get_extra_hashes(input_hashes, session_hashes)

    if extra_hashes:
        print(f"Found {len(extra_hashes)} examples to filter out.")
    else:
        print("No examples to filter out")
        sys.exit(0)

    # Filter and save datasets
    dataset_names = {
        "main": main_dataset,
        "annotator1": annotator1_dataset,
        "annotator2": annotator2_dataset,
    }

    for key, stream in streams.items():
        stream.apply(filter_out_examples, stream=stream, hashes=extra_hashes)
        save_dataset(stream, dataset_names[key])


def main():
    clean_datasets(
        input_filename="input.jsonl",  # replace with your input filename
        main_dataset="main_dataset",  # replace with target dataset
        annotator1_dataset="main_dataset-session",  # replace with annotator 1 dataset
        annotator2_dataset="main_dataset-session",  # replace with annotator 2 dataset
    )


if __name__ == "__main__":
    main()

Hi @magdaaniol

From your explanation, now, everything has been clear for me.

Thank you so much for your assistance. It's really helpful. Appreciated it. :pray:

1 Like