Hi @helmiina,
Thanks for all the details!
This does not seem to be happening. When I start a new session in the browser (without pre-determined sessions), a request is made to get_session_questions but there is no sign of the router being triggered. Even though there are tasks to be annotated, they are not routed to new sessions. The only time a router gets called is when the very first session is started.
Are you 100% sure it is not being triggered? Remember we put this "early exit" condition at the beginning of the router to prevent sessions being spammed with the "control" task:
# inside the task router function
def _task_router(ctrl: "Controller", session_id: str, item: Dict) -> List[str]:
if item.get("meta").get("is_control"):
return [session_id]
(...)
Maybe you can add a logging statement at the very beginning of the router function to double check?
What I think is happening in the flow you described is that, by the time, seppo joins, kari has consumed all the stream and the only tasks available are the "control" tasks that result in the early exit from the router for seppo, and thus, an empty queue. This will be like that until new examples are added.
I also tried re-adding tasks that are not yet annotated ANNOTATIONS_PER_TASK times and doing that does not trigger routing either. The verbose output from get_session_questions correctly shows that the input database has n more examples, but they are not routed to anyone, so no one gets anything to annotate.
I believe that the streaming function keeps track of already streamed task hashes to prevent a session from getting locked in an infinite loop:
DB = connect()
SEEN_BEFORE = set()
# Create a regular, finite stream from the current database state
def stream_tasks_from_db(db, input_ds_name: str):
"""Load annotated tasks from another interface"""
while True:
examples = db.get_dataset_examples(input_ds_name)
task_hashes = set(db.get_task_hashes(input_ds_name))
if task_hashes.difference(SEEN_BEFORE):
for item in examples:
set_hashes(item)
SEEN_BEFORE.add(item.get("_task_hash"))
yield set_hashes(item)
else:
# Yield a special control task
control_task = {
"text": "No new tasks available. Keep hitting ignore until new tasks show up.",
"meta": {"is_control": True, "action": "refresh"},
"_input_hash": random.randint(1, 100000)
}
set_hashes(control_task)
yield control_task
# Sleep briefly to avoid tight loops
time.sleep(1)
For this reason, the script that re-adds the new inputs also needs to convolute the task hashes to make sure the newly added tasks do not get filtered out at this point. This why you're observing that the newly added tasks are not routed to anyone.
Finally, since we are convoluting task hashes, you should set "exclude_by": "input" in the config (either .prodigy.json or the dictionary that the recipe returns) to make sure that the same tasks are not routed to the annotators that have already seen them.
While testing it, I also realized that we need to check the contents of the session's open questions for duplicates. Normally, it is not necessary because duplicates are eliminated on the stream loading level. Howver, with the infinite stream and intentional duplicates, there can be a condition that the seppo session might get all 6 questions in the queue because the router only checks against the already annotated questions saved in the DB.
To illustrate:
kari consumes the entire stream of 3
seppo joins but there are no new questions yet
- DB is being populated with duplicates
kari hits refresh (get_questions) and routes all 6 questions to seppo. Since we exclude by input kari doesn't seen any because all _input_hashes are already recorded in the DB for their session. However, since seppo did not have any input_hashes saved in the DB, they get all 6 questions that means duplicates.
To prevent that, you could add another filtering step in Session get_questions function (after filtering out the control questions):
# session.py
def get_questions(
(...)
open_items = [item for key, (ts, item) in self._open_tasks.items()]
open_items_input_hashes = {get_input_hash(item.data) for item in open_items}
(...)
results = [item for item in results if not item.data["meta"].get("is_control")]
results = [item for item in results if get_input_hash(item.data) not in open_items_input_hashes]
(...)