How to check if annotators have tasks in their queue?

Hey!

Is there a way to check if annotators have tasks in their queue? I’m thinking of building a Slack bot to support our annotators to help coordinate their work, as they work on a series of tasks dependent on each other.

Hi @tuomo_h:

There's no public API or endpoint to check queue status without side effects (public API docs). The /get_session_questions endpoint and controller.get_questions() both consume tasks i.e.they pull items from the queue and move them into annotator's queue, so calling them as a status check would interfere with the annotation flow.

What you can do, without affecting the stream, is to check whether a given annotator's queue is empty or not. Both checks are done on the controller object:

  1. controller.stream.has_next(session_id) - checks if more tasks can be fetched. It returns True if the source stream has more data OR the session's queue buffer isn't empty
  2. session._open_tasks - tasks already sent to the annotator but not yet answered:
session = controller._get_session(session_id)
len(session._open_tasks)  # tasks shown but unanswered

Together these tell you:

  • has_next = "there's more work available to pull"
  • len(_open_tasks) > 0 = "the annotator has been given tasks they haven't submitted yet"

Importantly, both of these require access to the live controller object in the running Prodigy process. You can't reach them from an external HTTP call because, as I mentioned before, there's no endpoint that exposes this without consuming tasks.
However, you can leverage Prodigy's custom event hook feature to add an extra endpoint that exposes queue status from the custom recipe.
You can find the docs on event hooks here but, in short, here's how it works:

  1. Your recipe returns an "event_hooks" dict mapping event names to handler functions
  2. Each handler receives (controller, **kwargs) where kwargs comes from the POST body
  3. The return value of the handler is sent back as the HTTP response
  4. Callable via POST /event/{name} with a JSON body

Here's an example how it would look like for the queue status checker as discussed above:

import prodigy
from prodigy.core import Controller

def queue_status(ctrl: Controller, *, session_id: str) -> dict:
    """Check if an annotator has tasks available without consuming them."""
    # Check if more tasks can be pulled from the stream
    has_more = False
    if session_id in ctrl.stream._queues:
        has_more = ctrl.stream.has_next(session_id)
    # Check tasks already sent to annotator but not yet answered
    open_count = 0
    if session_id in ctrl._sessions:
        open_count = len(ctrl._sessions[session_id]._open_tasks)
    return {
        "session_id": session_id,
        "has_available_tasks": has_more,
        "open_tasks": open_count,
        "active": session_id in ctrl._sessions, # the annotator has connected at least once
         }
@prodigy.recipe(
    "my.recipe",
    dataset=("test", "positional", None, str),
    source=("The source data as a JSONL file", "positional", None, str),
    )
def my_recipe(dataset, source):
    # ... your normal recipe setup ...
    return {
        "dataset": dataset,
        "stream": stream,
        "view_id": "ner_manual",
        "event_hooks": {
            "queue_status": queue_status,
          },
      }

Then in your slackbot you could call it like so:

import requests
PRODIGY_URL = "http://localhost:8080"
def check_annotator_queue(session_id: str) -> dict:
    resp = requests.post(
                    f"{PRODIGY_URL}/event/queue_status",
                    json={"session_id": session_id},
                )
    return resp.json()

# Example: check if "alice" has work on the "ner_tasks" dataset
status = check_annotator_queue("ner_tasks-alice")
# Returns: {"session_id": "ner_tasks-alice", "has_available_tasks": True, "open_tasks": 5, "active": True}

Note the session naming convention: sessions names are {dataset}-{annotator_name}

Here are some other attributes of the Session object that you might find useful for your workload management bot:

Property Type What it tells you
session.session_annotated int Annotations done in this session (since connect)
session.total_annotated int Total annotations including previous sessions
session.target_annotated int Target count if set
session.progress float Progress as float (0.0-1.0)
session.timestamp float Last activity time (timeit.default_timer) - useful for detecting idle annotators
session._open_tasks Dict[int, Tuple[float, QueueItem]] Tasks sent to annotator but not yet submitted
session._answered_task_hashes Set[int] Set of all completed task hashes
session._stolen_keys Set[int] Tasks stolen from this annotator by others (they went idle)
1 Like

Wow @magdaaniol , many thanks for the informative response! I will try to implement this! :blush: