"Refreshing" the stream of examples

tl;dr: can you refresh the stream without restarting Prodigy?

To manage a multi-annotator coding setup, I’m pre-loading examples into a dataset and then pulling from that dataset to generate the examples to show them. I’d like to have some custom logic in the stream, to exclude examples that have already been labeled 3 times by annotators or examples the annotator has seen before. This logic is easy to write, but it’s not working well with the stream generator setup, because the list of examples that they should see is changing as other annotators do their work.

Is there a good way to get the stream to refresh every few examples? Or does that require restarting Prodigy? This is getting outside the core use case perhaps, but any advice would be greatly appreciated.

Answering my own question (should have thought more before posting), it’s pretty easy to make an object that has a generator that can also be refreshed. E.g. ugly code like:

class DBStream:
    def __init__(self, active_coder, db_name):
        self.active_coder = active_coder
        self.db = prodigy.components.db.connect()
        self.db_name = db_name
    
    def get_examples(self):
        examples = self.db.get_dataset(self.db_name)
        good = []
        for eg in examples:
            if len(eg['coders']) < 3 and self.active_coder not in eg['coders']:
                good.append(eg)
        self.examples = iter(good)

where .examples is used as the stream generator, and .get_examples can be called again on update to refresh the stream. The bigger issue with this approach is the way I’m treating the Prodigy annotations dataset as both a task list and an annotation list. It actually makes much more sense to separate out the task dataset/base, where the unit is the piece of text, and the annotations dataset, where the unit is the annotation.

Thanks for updating with your solution! And yes, I was actually going to suggest pretty much the same thing. You could also try setting a lower batch size (if you’re not doing this already), so the stream can update quicker, and you’ll receive annotated tasks sooner.

Also, here’s an abstract example I already wrote up as a draft earlier that shows how the generator can respond to state changes. Posting it here in case others come across this thread later.

from toolz.itertoolz import partition

STATUS_COUNTER = 0  # some random state we want the stream to respond to
batch_size = 5
stream = [i for i in range(100)]  # a stream of 100 examples

# a custom stream iterator
def get_stream(stream):
    for eg in stream:
        if STATUS_COUNTER == 0:
            yield eg
        else:  # our global state has changed
            yield 'hello'

# the stream, partitioned into batches
queue = partition(batch_size, get_stream(stream), pad=None)

# the endpoint we'll query for questions
def get_questions():
    for batch in queue:
        return batch
    return []

get_questions()  # (0, 1, 2, 3, 4)
get_questions()  # (5, 6, 7, 8, 9)
get_questions()  # (10, 11, 12, 13, 14)
TEST = 1
get_questions()  # ('hello', 'hello', 'hello', 'hello', 'hello')
2 Likes

My solution’s not actually working like I’d hoped, and I wanted to see if you had an idea why (though again, I realize I’m outside the intended use case here). The idea is to call update_stream() once an answer comes back, in order to refresh the stream generator. When I run this in a Python session, it works as expected: repeatedly calling stream exhausts it, but if I call .update_stream in between, it keeps it refreshed. When I run the recipe, however, the stream is exhausted even though .get_examples is being called when the answers come back.

Is this a result of how Prodigy is handling state/environment between the functions? If I could repurpose the /get_questions call to call the DB directly, I think that would work, but again, that’s a pretty radical change to Prodigy. I’m doing an ugly workaround right now (use a huge queue and restart Prodigy regularly), but that’s not ideal. If you have any thoughts on what I can do, I’d really appreciate it. I’ve included some more code below, in case it helps illustrate what I’m doing.

def mark_custom(dataset, source=None, view_id=None, label='', api=None,
         loader=None, memorize=False, exclude=None):

    coder = source # repurposing input slot
    stream_empty = iter([]) # make sure the functions are using the same stream.
    stream = DBStream(coder, "protest_gsr")
    stream.get_examples()

    def ask_questions(stream_empty):
        for eg in stream.examples:
            yield eg

    def recv_answers(answers):
        for eg in answers:
            # add the coder to the list, update count, and un-reserve the example
            stream.coll.update_one({...}})
        print("Refreshing stream...")
        stream.get_examples()
...

 return {
        'dataset': dataset,
        'stream': ask_questions(stream_empty),
        'update': recv_answers,
    }

@ines
To pass the stream to different ports for different users:
Let say there is a batch of stream fed on a particular port, after the batch finishes, how should I send the next batch of the stream to that particular port without refreshing the browser ?

@andy
Hi, I was stuck with a similar problem, hope you can help!
So, if you are working with multiple annotators, you must be using different ports for different users, right ?
If so, how did your return work per annotator per port ?

I wrote up some short notes on how I did it here: How to keep count of annotations done by a person ?. Take a look at that and at the code on my Github repo. Hope that helps!