Asynchronous stream

Hello,

The stream for our recipe is composed of text elements that are loaded asynchronously every hour and for several days. For this reason, when we first run prodigy with our custom recipe, we’re able to label the tasks that are first loaded into the stream. Once the stream is fully consumed, the page displays No tasks available. An hour later, once new tasks have been loaded, we have to manually restart prodigy to create a new stream with the new items.

In order to get around this issue, we could potentially patch get get_questions method in the Controller as follows:

# python queue that contains new tasks loaded asynchronously
q = queue.Queue()

def patch_get_questions():
    items = []
    if not q.empty():      
        for _ in range(controller.batch_size):
            try:
                item = q.get(block=False)
                items.append(set_hashes(item))
            except queue.Empty as e:
                break

    return items

controller.get_questions = patch_get_questions

This seems to work, but we’re not sure what other processing may be done in the original get_questions method. We know this is highly dangerous, so we were wondering if you had any other ideas on how to accomplish this.

Thanks

1 Like

Thanks for sharing your code and your use case!

I do think the solution you’ve come up with is pretty reasonable, especially considering the specific use case and constraints. The default controller.get_questions method is very basic and only really yields batches from the stream anyways.

Going forward, it might actually make sense for Prodigy to adopt a Queue approach similar to yours, at least internally. This would make the behaviour of the stream more predictable, and would also make it easier to handle multiple consumers.

1 Like

Has this feature been updated in the newer versions of prodigy , for handling multiple consumers ?

The upcoming version of Prodigy will include more internals that help with managing streams, but you still have to decide how you want to implement the consumers and also how the data should flow though.

Our own solution will be implemented in Prodigy Scale – see here for details: