ordered tasks on "mark" receipe

I’m running “mark” recipe with --memorize and --view-id choice with the same jsonl dataset, but for different annotators (running on different machines) the dataset created in the db is different (again, with the exact same input file)
What I would like:

  1. All annotators would get the same tasks to annotate, by the same order (pre defined in the jsonl file preferred)
  2. On page refresh, an annotator would always get the same task, and not a random different task

I thought memorize flag was about that, but obviously I misunderstood that . Please clarify how this should be done

The --memorize flag will exclude examples that are already in the dataset. By "different annotators running on different machines", do you mean separate Python processes? Because if you run separate processes with the same input data and different target dataset names in the database, the questions should be asked in the exact same order, as they come in.

The only thing that's important to consider is the reloading:

If you refresh the browser, Prodigy will make a call to /get_questions, which will fetch the next batch from the queue. There's no obvious way for the process to know whether questions that were previously sent out are still "in progress" (e.g. if an annotator is working on them) or if they were discarded. The app doesn't wait to receive a batch before it sends out new questions, because it needs to make sure the queue never runs out.

So in order to reconcile the questions/answers, there are different decisions to make depending on whether you want to have every user label the same data once, or whether you want some overlap, only one answer per question etc. That's something you'd have to decide – but a good solution is usually to have a single provider (e.g. a separate REST API or something similar) which keeps track of what to send out to multiple consumers. This could, for example, happen in the stream, which is a generator and can make a request to the provider on each batch.

Here's a pseudocode example thast illustrates the idea:

def stream():
    next_questions = requests.get('http://your-api/?session-params').json()
    for question in next_questions:
        yield question

If every annotator is doing the same thing and you just want to make sure that all questions were really answered, you could also use a simpler approach, write a normal generator that yields data from your input file and add another loop at the end of your generator that checks the hashes ("_task_hash") of the existing answers and yields out the examples from the stream that aren't in the dataset yet, based on their hash.

Thanks @ines for the detailed and enlightening answer!
So to recap what I understand:

  1. Prodigy mark recipe does not randomize in any way the questions, I get them by the order defined in the datasource (like the jsonl file)
  2. On refresh prodigy does not “refetch” the current batch out, but get’s another batch from the file - if I would reduce batch_size to 1 , then refresh would get me the next question in the set

What I don’t get , without custom code, won’t the “open” batches be closed sometime? would those questions be lost for eternity? (or until we restart the process)? Thanks again for the patience

Thanks, I hope this wasn't too much of an info dump. That kind of stuff is just something we had to think about a lot when planning and developing Prodigy Scale. And it turns out there are actually many different ways you could want to reconcile your annotations that are all totally valid depending on the project and goal. So it took us a while to get all of this right in a way that generalises well. The good news is, we're already testing those wrappers (we're calling them "feeds") in the Prodigy internals and are hoping to expose more of them in the Python API so it's easier to put together your own multi-user and multi-session streams.

Yes, it really just iterates over them in order. You can also see this if you check the source of the mark function in recipes/generic.py.

Yes – although, Prodigy would still be fetching one question in the background to keep the queue full enough. So you'd always have at least one question "in limbo". Otherwise, it'd be way too easy to hit "No tasks available" if you annotate too fast and the next batch hasn't come in from the server yet.

The "orphaned" batch is technically discarded by the web app, yes. The server doesn't know if you close the browser – maybe you were just offline for a while and still have unsent answers (which is currently no problem). The back-end will only know whether answers are missing once the process is stopped.

It can then get the hashes from the existing dataset, and compare them against the hashes of the input data. When a stream comes in that's not yet hashed, Prodigy essentially does this:

from prodigy import set_hashes

def stream(examples):
    for eg in stream:
        eg = set_hashes(eg)
        yield eg

This adds an "_input_hash" and a "_task_hash" to each example. The input hash describes the input data, e.g. the "text" or "image". The "_task_hash" is based on the input data and other annotations you might be collecting feedback on (like the "spans" or the "label"). This lets you distinguish between questions about the same text but with different labels etc., which is quite common in Prodigy. (You can also customise what the hashes are based on btw – see the API docs for set_hashes.)

Even if your process is still running, you could make another pass over your data once the first stream runs out, check if the hash is already in the dataset and if not, send the example out again. For example:

from prodigy import set_hashes
from prodigy.components.db import connect

db = connect()  # use settings from prodigy.json
hashed_stream = (set_hashes(eg) for eg in stream)

def stream_generator():
    for eg in hashed_stream:
        yield eg
    # all examples are sent out, go over them again and
    # compare against the hashes currently in the dataset
    dataset_hashes = db.get_task_hashes("your_dataset_name")
    for eg in hashed_stream:
        if eg["_task_hash"] not in dataset_hashes:
            yield eg
    # etc.

Thanks @ines for the explanation and verification
When I look at the code of “mark” recipe I see

def ask_questions(stream):
    for eg in stream:
        if TASK_HASH_ATTR in eg and eg[TASK_HASH_ATTR] in memory:
            answer = memory[eg[TASK_HASH_ATTR]]
            counts[answer] += 1
        else:
            if label:
                eg['label'] = label
            yield eg

So two questions:
1, I see it’s being called with a stream, who generates this stream and when
2. For your example when would stream_generator be called and by what function?

In this case, the stream is generated by the get_stream helper, which is basically an convenience wrapper that checks the source, picks the right loader (based on the file type if none is specified), makes sure everything exists and is valid, etc:

stream = get_stream(source, api, loader)

The 'stream' returned by the recipe should to be a generator that yields dictionaries. Where it comes from and what it does internally is entirely up to you – so if you write a function that yields dicts, you can plug it into Prodigy :slightly_smiling_face:

Btw, our prodigy-recipes repo has a collection of standalone open-source recipes that are slightly simplified and annotated with comments, to make it easier to see what's going on and to adapt them for your own projects. Here's a version of the mark recipe for instance:

Sorry if this was confusing. What I called stream_generator is pretty much the ask_questions function.

(The main reason the recipe doesn't load the data within ask_questions and defines a function that takes the already loaded stream is that it lets us do the loading first thing in the function. If loading the file fails, you'll see that error immediately when you run the recipe on the command line and not just when the generator is executed, which happens later.)

Thanks @ines for the help. I almost got a solution working, but few questions remain

  1. How often would be get_questions be called for a 10000 lines dataset? I can see it’s not called every batch (5 items a batch in my case) , but I’m quite sure it’s not fetching all 10000 jsonl lines in one time.

  2. Do I understand correctly that “update” callback is called when client side sends “give_answers” - are there cases it wont’ be called?

  3. Is there a hook I can “manipulate” the outgoing task (to web client) after the initial stream load in ask_questions in “stream” callback? for example to add a timestamp attribute?

Nice, glad you got it working!

get_questions is called on first load, and whenever the current queue of unanswered questions has fewer than batch_size / 2 or 2 examples (whichever happens sooner). This is checked whenever the app receives a new answer. The idea behind this is to make sure that there are always enough examples queued up, even if you annotate very fast.

Internally, Prodigy makes sure to never consume your whole generator at once, e.g. by converting it to a list. This means you can load in really large files and still start annotating immediately as the first examples come in. It also means that your stream generator can change what it yields out over time.

Yes, exactly. And if it exists, it will always be called on give_answers.

If I understand your question correctly, this should be handled automatically be the generator streams design.

The stream generator won't be consumed all at once – on each request to get_questions, Prodigy will only fetch one batch from it and send it out. So a timestamp created within the stream generator should reflect the timestamp of when that part of the generator was evaluated, not when it was initially created.

Here's an example that shows the idea:

import time

def stream_generator():
    for i in range(10):
        yield time.time()

print("Current timestamp:", time.time())

stream = stream_generator()
print("First item:", next(stream))  # get one item

# Wait and iterate over the rest
time.sleep(0.5)
for stamp in stream:
    print(stamp)

Thanks, Perhaps I’ll clarify the questions, Since I’m building over the “mark” recipe and jsonl loader
'stream" callback is ask_questions(stream)
While somewhere in the start of the recipe:
stream = get_stream(source, api, loader)
I know get_questions is a generator (and this is where my code changes are currently)
So My Guess is:

  1. Since I don’t use currently an api or a custom loader I guess it defaults to the jsonl loader\
  2. get_stream returns a generator also (Also docstrings states otherwise) with a N tasks out of my 50000 tasks
  3. get_questions returns the generator based on this stream , but if exhausted some where where woudld generate a new steam and get_questions would be called with it

But I fail to see get_questions being called more then once, even when annotating 100+ items. so maybe the initial stream is “all” the tasks in the dataset jsonl file?

Ok, I tried debugging this stuff and I guess:
get_questions is “called” once but the generator in it is called with next() again and again when needed
Initial load is quite high (for a batch_size of 5 the initial load from the generator seems to be 50 - does this makes sense?) while the controller “sends” to frontend only 10 and then 5, 5, 5, batches, and hitting the iterator for more, when arriving at 47
Do I understand correctly?

I'm trying to solve a problem where on refresh the tasks that are skipped will gets loaded once again. I used this script it still is showing no tasks available after a certain number of refreshes.

@akshayklr057 Hi! Check out your other thread here: Task lost on "page refresh"

In general, it's more helpful for us if you only open one thread or post your question in one thread. This way, there are fewer posts in total and we're able to have a better overview and get to your answer quicker.