No more tasks in ordered DB stream

Hi!

I am running an interface streaming from a postgres database. This database gets a fixed size of weekly additional examples. Now my annotators do not always manage to keep up with this pace and this generates backlog and I would like them to annotate the newest examples first.

I currently try to adapt the stream such that my annotators always get the most recent example from the DB using ORDER in the call to the Postgres DB (see recipe below). However, when I ran this my annotators would be told that there are no more tasks, even though only ~650 (acc. to prodigy progress) of the +4K examples in the DB had been annotated. Any idea what the reason might be?

Recipe

import os
import spacy
import prodigy
from prodigy.components.preprocess import add_tokens
from prodigy.util import set_hashes
import psycopg2

@prodigy.recipe(
    "hs-default-db",
    dataset=("Dataset to save annotations into", "positional", None, str),
    lang=("Language to use", "positional", None, str)
    )

def customRecipe(dataset, lang):
    # We can use the blocks to override certain config and content, and set
    # `"text": None` for the choice interface so it doesn't also render the text
    blocks = [
        {"view_id": "html"},
        {"view_id": "ner_manual"},
        {"view_id": "choice", "text": None},
        {"view_id": "text_input"}
    ]
    
    span_labels = ["target"]
    textcat_labels = ["Not toxic", "Toxic (excl. hate)", "Hate speech - Nationality", "Hate speech - Ethnicity", "Hate speech - Immigrants", "Hate speech - Gender", "Hate speech - Religion", "Hate speech - Sexuality", "Hate speech - Disability", "Hate speech - Age", "Hate speech - Other"]
    html_template = (
        '<div class="context">'
        '<h4><span style="font-weight: bold;">Article Context</span></h4>'
        '<p style="margin-bottom: 0px; font-weight: bold;">{{article_title}}</p>'
        '<p style="margin-bottom: 5px; font-style: italic;">{{article_title_header}}</p>'
        '<p style="margin-bottom: 0px;">{{article_lead}}</p>'
        # '<p style="margin-bottom: 0px; font-weight: bold;">Parent Comment: {{parent_text}}</p>'
        '</div>'
        '<br>'
        '<h3 style="color: #801414; margin-bottom: 0px; margin-top: 10px; text-align: left;">Comment (find all targets)</h3>'
    )
        
    def dbLoader():
        # Fetch database credentials from environment variables
        dbname = os.getenv("PGDATABASE")
        user = os.getenv("PGUSER")
        password = os.getenv("PGPASSWORD")
        host = os.getenv("PGHOST")
        port = os.getenv("PGPORT")

        # Connect to PostgreSQL
        conn = psycopg2.connect(
            dbname=dbname,
            user=user,
            password=password,
            host=host,
            port=port
        )

        cur = conn.cursor()
        cur.execute("SELECT comment_id, text, article_title, article_title_header, article_lead FROM hs_samples  ORDER BY "samplingDatetime" DESC") 
        records = cur.fetchall()

        # Prepare data for Prodigy
        def replace_empty_string(obj):
            if obj is None:
                obj = "NA"
            return obj
    
        stream = []
        for record in records:
            
            comment_id = replace_empty_string(record[0])
            text = replace_empty_string(record[1])
            article_title = replace_empty_string(record[2])
            article_title_header = replace_empty_string(record[3])
            article_lead = replace_empty_string(record[4])


            example = {
                "comment_id": comment_id,
                "text": text,
                "article_title": article_title,
                "article_title_header": article_title_header,
                "article_lead": article_lead,
                "meta": {"source": "database"}
                }

            
            example = set_hashes(example)
            stream.append(example)

        cur.close()
        conn.close()

        return stream
    
    def add_options(stream):
        for t in stream:
            t['options'] = [
                {"id": lab, "text": lab} for lab in textcat_labels
            ]
            yield t
    
    nlp = spacy.blank(lang)           # blank spaCy pipeline for tokenization
    stream = dbLoader()
    stream = add_tokens(nlp, stream)
    stream = add_options(stream)

    return {
        "view_id": "blocks",  # Annotation interface to use
        "dataset": dataset,  # Name of dataset to save annotations
        "stream": stream,  # Incoming stream of examples
        "validate_answer": validate_answer,  # Validate the answers
        "config": {  # Additional config settings, mostly for app UI
            "lang": nlp.lang,
            "labels": span_labels,
            "blocks": blocks,
            # "keymap_by_label": {
            #     "0": "q", 
            #     "1": "w", 
            #     "2": "e", 
            #     "3": "r", 
            #     "product": "1", 
            #     "amount": "2",
            #     "size": "3",
            #     "type": "4",
            #     "topping": "5" 
            # },
            "choice_style": "multiple",
            "html_template": html_template,
            "custom_theme": {
                "bgCardTitle": "#801414",
                "colorHighlightLabel": "#801414"
            },
            "global_css_dir": "./recipes/style",
            # "javascript_dir": "./recipes/style",
            "instructions": "./recipes/instructions.html"
        }
    }

prodigy.json

{
  "buttons": ["accept", "undo"],
  "annotations_per_task": 1.1,
  "host":"0.0.0.0",
  "port":8080,
  "db":"postgresql",
  "db_settings":{
    "postgresql":{
      "host":"placeholder",
      "dbname":"placeholder",
      "user":"placeholder",
      "password":"placeholder",
      "port":"placeholder"
    }
  }
}

Hi @nicolaiberk ,

The issue is that your dbLoader function returns a list, while it should actually be a generator to make sure Prodigy calls it again.
In the current implementation dbLoader is being called only once which would explain the fact the stream is being empty too soon. Another explanation would be that the DB query does not return any records so we should really fix both.

Let's convert dbLoader to a generator (I also added try/except/finally blocks with some error handling for connection management - another option would be to use a context manager):

def dbLoader():
    
    # Fetch database credentials from environment variables
    dbname = os.getenv("PGDATABASE")
    user = os.getenv("PGUSER")
    password = os.getenv("PGPASSWORD")
    host = os.getenv("PGHOST")
    port = os.getenv("PGPORT")
        
    conn = None
    cur = None        
    
    # Connect to PostgreSQL
    try:        
        conn = psycopg2.connect(
            dbname=dbname,
            user=user,
            password=password,
            host=host,
            port=port
        )

        cur = conn.cursor()
        cur.execute("SELECT comment_id, text, article_title, article_title_header, article_lead FROM hs_samples  ORDER BY "samplingDatetime" DESC")
        
        # Prepare data for Prodigy
        def replace_empty_string(obj):
            if obj is None:
                obj = "NA"
            return obj
 
        while True:        
            records = cur.fetchall() # consider using fetchmany to load to memory in batches
            if not records:
                msg.warn("Query did not return any records")
                break
            
            for record in records:
                comment_id = replace_empty_string(record[0])
                text = replace_empty_string(record[1])
                article_title = replace_empty_string(record[2])
                article_title_header = replace_empty_string(record[3])
                article_lead = replace_empty_string(record[4])

                example = {
                    "comment_id": comment_id,
                    "text": text,
                    "article_title": article_title,
                    "article_title_header": article_title_header,
                    "article_lead": article_lead,
                    "meta": {"source": "database"}
                    }
                yield set_hashes(example)
    except psycopg2.Error as e:
        msg.fail(f"Database error: {e}")
    except Exception as e:
        msg.fail(f"Unexpected exception when loading from the DB: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()

As you can see I also added a print statement for the query not returning records so you can easily know if that's what's happening.

Optionally you could also convert the generator to the newer Prodigy data structure Stream which is dedicated to handling input examples. You'll need to import the get_stream helper from Stream:

    from prodigy.components.stream import get_stream

    stream = get_stream(dbLoader())
    stream.apply(add_tokens, nlp=nlp, stream=stream)
    stream.apply(add_options, stream=stream)

I haven't tested it back to back but I'm pretty sure that should solve the issue - let me know if not of course!

1 Like

Thank you so much for the quick and heelpful response! I will implement the changes and get back to you next week!

Sorry it took me so long to respond. So far this looks good, thank you for suggesting this! I am awaiting incoming annotations though.

Unfortunately I am facing a similar issue with another (much simpler) task where I am using the CSV loader. I am trying to get exactly 2 annotations per task (from two different out of three annotators). The annotations are written to a postgres DB. I define PRODIGY_ALLOWED_SESSIONS=a,b,c in the environment.

Recipe

import prodigy
from prodigy.components.loaders import CSV

@prodigy.recipe(
    "basic_blick_appropriate",
    dataset=("Dataset to save annotations into", "positional", None, str),
    file_in=("Path to examples.jsonl file", "positional", None, str)
    )
def custom_recipe(dataset, file_in):
    # We can use the blocks to override certain config and content, and set
    # "text": None for the choice interface so it doesn't also render the text
    blocks = [
        {"view_id": "text"},
        {"view_id": "text_input","field_rows": 1, "field_id": "user_input_a","field_label": "Is the comment insulting/derogatory/hate speech? 0 (no), 1 (yes)"},
        {"view_id": "text_input","field_rows": 1, "field_id": "user_input_b","field_label": "Is the email response appropriate to the comment? 0 (no), 1 (yes)"}
    ]
               
    stream = CSV(file_in)

    return {
        "view_id": "blocks",  # Annotation interface to use
        "dataset": dataset,  # Name of dataset to save annotations
        "stream": stream,  # Incoming stream of examples
        # "task_router": task_router_annot, # task routing
        "config": {  # Additional config settings, mostly for app UI
            "blocks": blocks,
            "choice_style": "multiple",
            "custom_theme": {
                "bgCardTitle": "#801414",
                "colorHighlightLabel": "#801414"
            },
            "global_css_dir": "./recipes/style",
        }
    }

prodigy.json

{
  "buttons":[
    "accept",
    "undo"
  ],
  "annotations_per_task": 2,
  "host":"0.0.0.0",
  "port":8080,
  "db":"postgresql",
  "db_settings":{
    "postgresql":{
      "host":"placeholder",
      "dbname":"placeholder",
      "user":"placeholder",
      "password":"placeholder",
      "port":"1234"
    }
  }
}

Hi @nicolaiberk,

What's exactly the issue with your CSV workflow? If it's the fact that annotations_per_task is not working as expected this can be fixed by switching from the legacy CSV loader to the new Stream class loader by importing the get_stream helper from prodigy.components.stream. This will resolve your input file based on the extension and handle csv, jsonl, json format accordingly.
Could you try loading your data like so:

from prodigy.components.stream import get_stream
stream = get_stream(file_in)

Thank you for your response and my apologies for the bad description of the issue!

I did meanwhile switch to the get_stream method, but my issue seems to persist for both tasks: the interface shows no more tasks available despite more tasks needing annotation. Checking all user sessions seems to provide a temporary solution, but the task gets stuck after a few examples again...

Recipe

import os
import spacy
import prodigy
from prodigy.components.preprocess import add_tokens
from prodigy.components.stream import get_stream
from prodigy.util import set_hashes, msg
import psycopg2
from dotenv import load_dotenv
load_dotenv()


@prodigy.recipe(
    "hs-default-db",
    dataset=("Dataset to save annotations into", "positional", None, str),
    lang=("Language to use", "positional", None, str)
    )

def customRecipe(dataset, lang):
    # `"text": None` for the choice interface so it doesn't also render the text
    blocks = [
        {"view_id": "html"},
        {"view_id": "ner_manual"},
        {"view_id": "choice", "text": None},
        {"view_id": "text_input"}
    ]
    
    span_labels = ["target"]
    textcat_labels = ["Not toxic", "Toxic (excl. hate)", "Hate speech - Nationality", "Hate speech - Ethnicity", "Hate speech - Immigrants", "Hate speech - Gender", "Hate speech - Religion", "Hate speech - Sexuality", "Hate speech - Disability", "Hate speech - Age", "Hate speech - Other"]
    html_template = (
        '<div class="context">'
        '<h4><span style="font-weight: bold;">Article Context</span></h4>'
        '<p style="margin-bottom: 0px; font-weight: bold;">{{article_title}}</p>'
        '<p style="margin-bottom: 5px; font-style: italic;">{{article_title_header}}</p>'
        '<p style="margin-bottom: 0px;">{{article_lead}}</p>'
        # '<p style="margin-bottom: 0px; font-weight: bold;">Parent Comment: {{parent_text}}</p>'
        '</div>'
        '<br>'
        '<h3 style="color: #801414; margin-bottom: 0px; margin-top: 10px; text-align: left;">Comment (find all targets)</h3>'
    )
        
    def dbLoader():
        # Fetch database credentials from environment variables
        dbname = os.getenv("PGDATABASE")
        user = os.getenv("PGUSER")
        password = os.getenv("PGPASSWORD")
        host = os.getenv("PGHOST")
        port = os.getenv("PGPORT")

        ## based on answer to https://support.prodi.gy/t/no-more-tasks-in-ordered-db-stream/7412
        conn = None
        cur = None

        try:
            # Connect to PostgreSQL
            conn = psycopg2.connect(
                dbname=dbname,
                user=user,
                password=password,
                host=host,
                port=port
            )

            cur = conn.cursor()
            cur.execute('SELECT comment_id, text, article_title, article_title_header, article_lead FROM hs_samples ORDER BY "samplingDatetime" DESC')

            # Prepare data for Prodigy
            def replace_empty_string(obj):
                if obj is None:
                    obj = "NA"
                return obj
            
            while True:
                records = cur.fetchmany(1000)
                if not records:
                    msg.warn("Query did not return records.")
                    break

                for record in records:
                    comment_id = replace_empty_string(record[0])
                    text = replace_empty_string(record[1])
                    article_title = replace_empty_string(record[2])
                    article_title_header = replace_empty_string(record[3])
                    article_lead = replace_empty_string(record[4])

                    example = {
                        "comment_id": comment_id,
                        "text": text,
                        "article_title": article_title,
                        "article_title_header": article_title_header,
                        "article_lead": article_lead,
                        "meta": {"source": "database"}
                        }

                    yield set_hashes(example)
        except psycopg2.Error as e:
            msg.fail(f"Database error: {e}")
        except Exception as e:
            msg.fail(f"Unexpected exception when loading from the DB: {e}")
        finally:
            if cur:
                cur.close()
            if conn:
                conn.close()
    
    stream = get_stream(dbLoader())
    
    def add_options(stream):
        for t in stream:
            t['options'] = [
                {"id": lab, "text": lab} for lab in textcat_labels
            ]
            yield t
    
    nlp = spacy.blank(lang)
    stream.apply(add_tokens, nlp=nlp, stream=stream)
    stream.apply(add_options, stream=stream)


    def validate_answer(eg):
        selected = eg.get("accept", [])
        spans = eg.get("spans", [])
        errors = []
        # errors.append(eg.get("user_input", []))
        if len(selected) < 1:
            errors.append("Select at least one option.")
        # if len(spans) == 0 and not ("Not toxic" in selected) and not ("Toxic (excl. hate)" in selected):
        #     errors.append("Please select the target of toxic speech in the comment.")
        if len(spans) > 0 and ("Not toxic" in selected):
            errors.append("Please only select targets of toxic speech.")
        if ("Not toxic" in selected) & (len(selected) > 1):
            errors.append("Note that a comment which is not toxic cannot contain Hate speech.")
        if ("Toxic (excl. hate)" in selected) & (len(selected) > 1):
            errors.append("Note that a comment which is toxic but not hateful cannot contain Hate speech.")
        if ("Hate speech - Other" in selected) & (eg.get("user_input") is None):
            errors.append("Please provide a target group in the text field.")
        if errors:
            raise ValueError(" ".join(errors))


    return {
        "view_id": "blocks",  # Annotation interface to use
        "dataset": dataset,  # Name of dataset to save annotations
        "stream": stream,  # Incoming stream of examples
        "validate_answer": validate_answer,  # Validate the answers
        "config": {  # Additional config settings, mostly for app UI
            "lang": nlp.lang,
            "labels": span_labels,
            "blocks": blocks,
            # "keymap_by_label": {
            #     "0": "q", 
            #     "1": "w", 
            #     "2": "e", 
            #     "3": "r", 
            #     "product": "1", 
            #     "amount": "2",
            #     "size": "3",
            #     "type": "4",
            #     "topping": "5" 
            # },
            "choice_style": "multiple",
            "html_template": html_template,
            "custom_theme": {
                "bgCardTitle": "#801414",
                "colorHighlightLabel": "#801414"
            },
            "global_css_dir": "./recipes/style",
            # "javascript_dir": "./recipes/style",
            "instructions": "./recipes/instructions.html"
        }
    }

prodigy.json (note that I switched to partial overlap here)

{
  "buttons": ["accept", "undo"],
  "annotations_per_task": 1.1,
  "host":"0.0.0.0",
  "port":8080,
  "db":"postgresql",
  "db_settings":{
    "postgresql":{
      "host":"placeholder",
      "dbname":"placeholder",
      "user":"placeholder",
      "password":"placeholder",
      "port":"placeholder"
    }
  }
}

(also changed my preferences now to receive an email when you answer - somehow this did not seem to be the default)

Hi @nicolaiberk ,

There's nothing in your recipe that stands out as incorrect at this point. You say that there are "more tasks needing annotations" - could you explain a bit more on what basis you drew this conclusion?
Is there a possibility that there are duplicates in the input data? This might lead to a situation where the input dataset is longer that the output dataset.

You could check for duplicates in your input with a script similar to this one:

from prodigy.components.stream import get_stream

input_stream = get_stream(dbLoader(), dedup=False)
input_counts = {}
for eg in input_stream:
    input_hash = eg['_input_hash']
    input_counts[input_hash] = task_counts.get(input_hash, 0) + 1

duplicates = [hash for hash, count in input_counts.items() if count > 1]
if duplicates:
        print(f"Note: Input stream contains {len(duplicates)} duplicate hashes")
        for hash_, count in duplicates.items():
            print(f"Hash: {hash_} appears {count} times")
else:
    print("No duplicates in the input stream")

Another thing to check would be whether the examples that you expect to be streamed already contain required number of annotations in the database (for example from previous annotation sessions).
Here's a script you could use to get some statistics over your annotations and the input datasets to get an idea what exactly is missing:

from prodigy.components.db import connect
from prodigy.components.stream import get_stream
import json
from collections import Counter, defaultdict


def analyze_annotation_discrepancy(dataset_name):
    print("\n=== Prodigy Annotation Analysis ===\n")

    # Get input stream data
    input_stream = get_stream(dbLoader())
    input_stream_input_hashes = [eg["_input_hash"] for eg in input_stream]
    
    print("=== Input Stream Statistics ===")
    print(f"Total input hashes: {len(input_stream_input_hashes)}")
    print(f"Unique input hashes: {len(set(input_stream_input_hashes))}")

    # Connect to database
    db = connect()
    
    print(f"=== Analysis for {dataset_name} ===")
    
    # Get basic counts
    input_hashes = db.get_input_hashes(dataset_name)
    task_hashes = db.get_task_hashes(dataset_name)
    examples = db.get_dataset(dataset_name)
    
    datasets_info = {}
    datasets_info[dataset_name] = {
        'input_hashes': set(input_hashes),
        'task_hashes': set(task_hashes),
        'counts': {
            'input_hashes': len(input_hashes),
            'unique_input_hashes': len(set(input_hashes)),
            'task_hashes': len(task_hashes),
            'unique_task_hashes': len(set(task_hashes)),
        }
    }
        
    # Print statistics
    print(f"Input hashes: {len(input_hashes)}")
    print(f"Unique input hashes: {len(set(input_hashes))}")
    print(f"Task hashes: {len(task_hashes)}")
    print(f"Unique task hashes: {len(set(task_hashes))}")

    # Calculate annotations per annotator
    annotator_counts = Counter(ex.get('_annotator_id') for ex in examples)
    print("\n=== Annotations per Annotator ===")
    for annotator, count in annotator_counts.items():
        print(f"Annotator {annotator}: {count} annotations")

    # Calculate average annotations per example
    annotations_per_hash = defaultdict(int)
    for ex in examples:
        annotations_per_hash[ex['_input_hash']] += 1
    
    avg_annotations = sum(annotations_per_hash.values()) / len(annotations_per_hash) if annotations_per_hash else 0
    print(f"\nAverage annotations per example: {avg_annotations:.2f}")

    # Distribution of annotations per example
    annotation_distribution = Counter(annotations_per_hash.values())
    print("\n=== Distribution of Annotations per Example ===")
    for num_annotations, count in sorted(annotation_distribution.items()):
        print(f"{num_annotations} annotation(s): {count} examples")

    # Comparison analysis
    print("\n=== Comparison Analysis ===")
    
    # Compare with input stream
    missing_from_dataset = set(input_stream_input_hashes) - datasets_info[dataset_name]['input_hashes']
    extra_in_dataset = datasets_info[dataset_name]['input_hashes'] - set(input_stream_input_hashes)
    
    print(f"\nComparing {dataset_name} with input stream:")
    print(f"Missing from annotated dataset: {len(missing_from_dataset)} hashes")
    print(f"Extra in annotated dataset: {len(extra_in_dataset)} hashes")
    
    if missing_from_dataset:
        print("\nMissing hashes:")
        for hash_value in list(missing_from_dataset):
            print(f"- {hash_value}")


if __name__ == "__main__":
    analyze_annotation_discrepancy(
        dataset_name="dataset_name",  # replace with target dataset name
    )

If you could share the output of this script (make sure it has access to your dbLoader function) - we could have a better idea of what is going on.