Hi @helmiina,
Thanks for all the details!
This does not seem to be happening. When I start a new session in the browser (without pre-determined sessions), a request is made to get_session_questions
but there is no sign of the router being triggered. Even though there are tasks to be annotated, they are not routed to new sessions. The only time a router gets called is when the very first session is started.
Are you 100% sure it is not being triggered? Remember we put this "early exit" condition at the beginning of the router to prevent sessions being spammed with the "control" task:
# inside the task router function
def _task_router(ctrl: "Controller", session_id: str, item: Dict) -> List[str]:
if item.get("meta").get("is_control"):
return [session_id]
(...)
Maybe you can add a logging statement at the very beginning of the router function to double check?
What I think is happening in the flow you described is that, by the time, seppo
joins, kari
has consumed all the stream and the only tasks available are the "control" tasks that result in the early exit from the router for seppo
, and thus, an empty queue. This will be like that until new examples are added.
I also tried re-adding tasks that are not yet annotated ANNOTATIONS_PER_TASK
times and doing that does not trigger routing either. The verbose output from get_session_questions
correctly shows that the input database has n
more examples, but they are not routed to anyone, so no one gets anything to annotate.
I believe that the streaming function keeps track of already streamed task hashes to prevent a session from getting locked in an infinite loop:
DB = connect()
SEEN_BEFORE = set()
# Create a regular, finite stream from the current database state
def stream_tasks_from_db(db, input_ds_name: str):
"""Load annotated tasks from another interface"""
while True:
examples = db.get_dataset_examples(input_ds_name)
task_hashes = set(db.get_task_hashes(input_ds_name))
if task_hashes.difference(SEEN_BEFORE):
for item in examples:
set_hashes(item)
SEEN_BEFORE.add(item.get("_task_hash"))
yield set_hashes(item)
else:
# Yield a special control task
control_task = {
"text": "No new tasks available. Keep hitting ignore until new tasks show up.",
"meta": {"is_control": True, "action": "refresh"},
"_input_hash": random.randint(1, 100000)
}
set_hashes(control_task)
yield control_task
# Sleep briefly to avoid tight loops
time.sleep(1)
For this reason, the script that re-adds the new inputs also needs to convolute the task hashes to make sure the newly added tasks do not get filtered out at this point. This why you're observing that the newly added tasks are not routed to anyone.
Finally, since we are convoluting task hashes, you should set "exclude_by": "input"
in the config (either .prodigy.json or the dictionary that the recipe returns) to make sure that the same tasks are not routed to the annotators that have already seen them.
While testing it, I also realized that we need to check the contents of the session's open questions for duplicates. Normally, it is not necessary because duplicates are eliminated on the stream loading level. Howver, with the infinite stream and intentional duplicates, there can be a condition that the seppo
session might get all 6 questions in the queue because the router only checks against the already annotated questions saved in the DB.
To illustrate:
kari
consumes the entire stream of 3
seppo
joins but there are no new questions yet
- DB is being populated with duplicates
kari
hits refresh (get_questions
) and routes all 6 questions to seppo
. Since we exclude by input kari
doesn't seen any because all _input_hashes
are already recorded in the DB for their session. However, since seppo
did not have any input_hashes
saved in the DB, they get all 6 questions that means duplicates.
To prevent that, you could add another filtering step in Session
get_questions
function (after filtering out the control questions):
# session.py
def get_questions(
(...)
open_items = [item for key, (ts, item) in self._open_tasks.items()]
open_items_input_hashes = {get_input_hash(item.data) for item in open_items}
(...)
results = [item for item in results if not item.data["meta"].get("is_control")]
results = [item for item in results if get_input_hash(item.data) not in open_items_input_hashes]
(...)