Dynamically add to stream during update

I'd like to build a workflow that dynamically fetches data during the update callback and adds new tasks to the stream. Is there a way to do that from a custom recipe?

It seems that the stream is only called once and then iterated on.

I've tried overwriting the controller.stream property directly but that doesn't seem to work.

Any ideas?

Thanks!

Streams are regular Python generators and Prodigy will only ever request the next batch from the stream. So they're specifically designed to be modified at runtime (which is also kind of how the active learning and live API integration works).

So you may not even need to use the update callback and can design your stream like this:

def get_stream():
    while True:
        examples = fetch_next_batch_of_data()
        yield from examples

If you do want it to respond to external state or the already existing answers, you can do things like this:

n_answers = 0

def update(answers):
    n_answers += len(answers)

def get_stream():
    while True:
        if n_answers < 100:
            examples = fetch_from_source_a()
        else:
            examples = fetch_from_source_b()
        yield from examples

That's what I thought. Maybe I'm just being really dumb right now.

I'm trying to create a "phrases.teach" recipe based on sense2vec. I have a common problem of wanting to seed NER models with multi-word patterns, not just from single word vectors and it seems like sense2vec covers enough domain areas to be useful for this.

This is my recipe, would love some help debugging it.
I assumed that since accept_phrases is updated through the update method that the stream would count those examples the next time the while True iterated.

# coding: utf8
from __future__ import unicode_literals

import prodigy
from prodigy._api.shared import get_controller
from prodigy.components.db import connect
from prodigy.components.sorters import Probability
from prodigy.util import split_string, set_hashes
import requests
import spacy
from spacy.tokens import Doc, Span


@prodigy.recipe('phrases.teach',
    dataset=("The dataset to use", "positional", None, str),
    seeds=("One or more comma-separated seed terms", "option", "se", split_string)
)
def phrases_teach(dataset, seeds):
    """
    Bootstrap a terminology list with word vectors and seeds terms. Prodigy
    will suggest similar terms based on the word vectors, and update the
    target vector accordingly.
    """
    DB = connect()
    if dataset and dataset in DB:
        seed_tasks = [set_hashes({'text': s, 'answer': 'accept'}) for s in seeds]
        DB.add_examples(seed_tasks, datasets=[dataset])

    accept_phrases = seeds
    reject_phrases = []

    def sense2vec(phrase, threshold=0.88):
        res = requests.post('https://api.explosion.ai/sense2vec/find', {
            "sense": "auto",
            "word": phrase
        })
        results = res.json()["results"]
        output = []
        for r in results:
            if r["score"] > threshold:
                output.append((r["score"], r["text"]))
        return output

    def update(answers):
        for answer in answers:
            if answer['answer'] == 'accept':
                accept_phrases.append(answer['text'])
            elif answer['answer'] == 'reject':
                reject_phrases.append(answer['text'])
    
    def get_stream():
        seen = set(accept_phrases)
        sensed = set()

        while True:
            for p in accept_phrases:
                if p.lower() not in sensed:
                    sensed.add(p.lower())
                    for score, phrase in sense2vec(p):
                        if phrase.lower() not in seen:
                            seen.add(phrase.lower())
                            yield {"text": phrase, 'meta': {'score': score}}

    stream = get_stream()

    return {
        'view_id': 'text',          # Annotation interface to use
        'dataset': dataset,         # Name of dataset to save annotations
        'stream': stream,           # Incoming stream of examples
        'update': update,           # Update callback, called with answers
    }

The recipe seems to block here.

@hug.post("/get_session_questions", requires=conditional_api_token)
def get_session_questions(session_id: hug.types.text):
    ...
    print("TASKS: ")
    tasks = controller.get_questions(session_id=session_id)
    print("GOT TASKS: ", tasks)
    ...

So I see "TASKS:" but never "GOT TASKS:", [tasks]

And this is how I'm calling it:

prodigy phrases.teach phrases_test -F prodigy-recipes/contrib/phrases/phrases_teach.py -se "natural language processing"

This is a nice idea! The way you've implemented is actually very similar to how the terms.teach recipe works – see here for the code.

Can you try adding some print statements to your while True loop? What you're observing sounds a lot like you're getting stuck in an infinite loop and fetching the next batch from the stream never completes because not enough examples

Btw, you probably want to self-host the sense2vec API or just integrate the code directly. Source here:

Yep that was it. It was coming back with 9 examples at the threshold I set so it got stuck in an infinite loop. I assumed you were polling 1 example at a time, not the batch of 10 at a time. I knew it was something dumb
on my part haha.

Thanks!

For sure I can deploy my own version for the work I'm doing.

I was going to contribute this to prodigy-recipes though. Should I keep your demo url there as the example?
sense2vec has some issues (particularly cymem versions) running with the new versions of prodigy so I figured a rest api would be easier.

I can open a PR and we can discuss there too if you'd prefer.

PR is here: https://github.com/explosion/prodigy-recipes/pull/9