Stream data from database in an infinite loop.

Hi! I've been testing Prodigy and have it running for some tasks. But I'm struggling with a particular scenario:

I want to have an infinite annotation task that streams examples from the database (if any). I've tried the infinite loop that is explained in some posts here. This works fine except apparently when the batch_size < len(samples). In this case the streamer keeps polling the database. I've thought of decreasing the batch_size but this is inconvenient because I'm updating the database in the update method, and this would slow annotators down.

Any thoughts?

Thank you!

Hi! By this, do you mean cases where the number of examples left in the database is lower than the batch size? And what should ideally happen in that case, do you want Prodigy to display "No tasks available"?

1 Like

Hi @ines ! Yes, exactly that. If the batch size is 10, and I have 4 examples left on the database, I want to tag them, save them, and have the "No more tasks available".

After that, ideally, If I restart the page, the database would get queried again. Less ideally we could "sleep" for some time, and restart the process (with os.kill for example, the orchestrator would deal with that)

thank you!

I've "solved" this but it's far from perfect. I wonder if there is no better solution.

Just to recap, my task is as follows:

I need to tag examples with category and sub_category. At any given moment new data may stream in, and someone should tag a sample of those. And because I'm doing two passes over the data, once the user saves annotated examples with "category", new examples for "sub_category" will be available (and maybe other).

Following the custom_textcat recipe I came up with:

def stream_pre_annotated(...):

    seen = set()
    global LAST_BATCH_SAVED

    while True:
        
        examples = get_examples_from_external_db()
        for example in examples:

            NEW_EXAMPLES = False
            example = transform_example(example)
            example = set_hashes(example)

            if example["_task_hash"] not in seen:
                seen.add(example["_task_hash"])
                NEW_EXAMPLES = True
                LAST_BATCH_SAVED = False
                yield example

        if not NEW_EXAMPLES:
            i = 0
            while not LAST_BATCH_SAVED:
                yield {
                    "text": NO_MORE,
                    "options": [{"id": "wait", "text": "please hit save"}],
                    "_task_hash": i,
                }
                i += 1


@prodigy.recipe("single-choice")
def custom_textcat_recipe(...):
    
    def update(answers):
        
        global LAST_BATCH_SAVED

        MORE_EXAMPLES = all([a["text"] != NO_MORE for a in answers])
        
        if MORE_EXAMPLES:
            answers = [a for a in answers if a["text"] != NO_MORE]
            save_to_external_db(answers)
        else:
            LAST_BATCH_SAVED = True
            os.kill(os.getpid(), signal.SIGTERM)


    return {
        "dataset": dataset,
        "view_id": "choice",
        "stream": stream_pre_annotated(...),
        "update": update,
        "on_exit": on_exit,
    }

What I get from this a way to make sure that the person working saves the "end of batch" results and after that the process dies and k8s bring it up again. So, next time someone enters the url, maybe they have new work, maybe not.

Ideally, I want to achieve this behavior:

I'll be grateful for any ideas.
Thanks