Receiving "Couldn't save annotations" error

hi @cheyanneb!

v1.11.8a4 is the latest. We're working hard to release v1.12 soon so now could be a great time to prepare for v1.12. It includes migration script that will be required. Let us know of any issues you find!

Thanks! Is it possible get v1.11.8a4 wheels?

The wheels aren't available openly but we've sent you a follow up message with them. Anyone else who has questions can send me a direct message and I can help them get the wheels if they're interested.

We are still experiencing this error with v.1.11.11. Any updates on this? Thanks!

Bummer to hear this issue hasn't gone away.

I guess the best thing for me to do now is to try if I am able to reproduce this locally. With that in mind, is it possible to share your Dockerfile? Also, is this running a custom recipe? If so, is it possible to share that as well?

I'm likely not going to be able to mock the proxy and the CloudSQL database, but I might be able to reproduce something if I know more about the recipe/docker setup.

Perhaps a final question, does this issue perists across all annotation tasks that you give it? As in, are there recipes/situations for which this issue does not occur? Does it also occur when a single person is annotating?

I will see if I can share those files. As for the final question:

  • This is not associated with one recipe -- it has occurred in a variety of tasks calling different recipes. It seems random -- I've tried to find patterns, checked the original dataset to see if there was anything going on there, etc. And the logs do not reveal anything.
  • When I export the final dataset, the documents that received this error are missing, meaning the annotation never saved.
  • Annotators can click through, but every time they start annotating again (and they do use Chrome, and they refresh their browser, they don't start annotating again in a task/browser that was dormant), they have to click through these documents that get stuck in this save error state. So like with ignore, I could set up a new task to handle these missing docs, but it's usually just 1 or 2 and we can do them manually.
  • We always have >1 person annotating the same dataset, but we experience this error a.) when annotator 1 is annotating while annotator 2 is not, and when b.) both people are annotating at the same time. Both scenarios experience the error.

Here is our Dockerfile.local. Its purpose is to be able to run a docker container locally, without relying on any Gitlab or Kubernetes dependencies. To make it work (since you not have access to our base image), substitute FROM python:3.9-slim on the second line.

# Alternative Dockerfile that does not depend on AWS
FROM gcr.io/posh-containers/nsgi-base-image:latest

LABEL maintainer="Posh NLP Team <email.address>"

ENV LANG=C.UTF-8
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PRODIGY_WHEEL=prodigy-1.11.7-cp39-cp39-linux_x86_64.whl

WORKDIR /workspace
# This is where the raw data files (downloaded from s3) will be stored
RUN mkdir /workspace/data

# Install prodigy and its dependencies
COPY wheels/$PRODIGY_WHEEL /workspace
RUN pip install $PRODIGY_WHEEL psycopg2-binary hypercorn pyyaml \
    google-cloud-storage \
    && rm -rf /root/.cache/pip \
    && rm -f $PRODIGY_WHEEL

# Install a trained pipeline (spaCy language model) for NER recipes
RUN python -m spacy download en_core_web_md

COPY setup.py /workspace
COPY src/ /workspace/src

RUN pip install .

# The first directive exposes the API for "logging in" (so to speak) and
# "registering" as an annotator; the next directive exposes the ports for
# connecting to the annotator-specific web server running inside the container
EXPOSE 5000
EXPOSE 9091-9100

CMD ["hypercorn", \
     "--bind", "0.0.0.0:5000", \
     "--access-logfile", "-", \
     "--error-logfile", "-", \
     "annotator.api.app:app"]

Is it possible for you to share what you're doing in this file? I imagine you're doing some custom code there to deal with the different ports for different annotators.

I'm checking on what I am able to share. But yes, this is custom code to deal with different ports for different annotators.

Here is some additional code that may help you reproduce the issue:

app.py

import json
import logging
import os
from typing import Dict, List

import yaml
from fastapi import FastAPI, HTTPException
from google.cloud import storage

from annotator import (DESCRIPTION, GCS_DEFINITIONS_FOLDER, GCS_ROOT,
                       __version__)
from annotator.server import AnnotationTask, ProdigyServer, TaskDefinition
from annotator.sql_commands import SQLConnector
from annotator.utils import log, modify_config, timestamp

ENVIRONMENT = os.getenv("ENVIRONMENT", "local")

logger = logging.getLogger("hypercorn.access")


app = FastAPI(
    title="Posh multi-task annotation service",
    description=DESCRIPTION, version=__version__
)

known_annotators: List[str] = None
task_definitions: Dict[str, TaskDefinition] = {}
active_tasks: Dict[str, AnnotationTask] = {}
sql_connector = None


def create_server(task_name: str, port_num: int = None):
    """
    Get the task definition from the existing task definition list and create a
    Prodigy web server for it.
    """
    task_def = TaskDefinition(**task_definitions.get(task_name))
    verb = "RECREATING" if port_num is not None else "CREATING new"
    log(f"{verb} server for '{task_name}'")
    return AnnotationTask(task_def, port_num=port_num)


def add_annotator_to_task(annotator_name: str, task_name: str):
    """
    Add annotator to a task. Modifies global Dict "active_tasks".
    """
    global active_tasks
    log(f"ADDING '{annotator_name}' to '{task_name}'")
    active_tasks[task_name].add_annotator(annotator_name)


@app.on_event("startup")
async def initialize():
    """
    Perform the required initialization steps or raise a RuntimeError if any of
    the required resouces (password, db host) cannot be obtained. Note that we
    use log() to report progress because the logger is not available until the
    app is properly initialized.
    """
    global active_tasks
    url_separator = "-"
    # 1. First, we must get the postgres password, or we're toast
    try:
        # In a dev environment, the Google Secrets Client is not available, so
        # we get the password directly from the environment
        from annotator.secrets_manager import SecretsManager
        log("DEPLOYING from Gitlab pipeline")
        pg_password = SecretsManager.get_secret("PRODIGY_PASS")
    except ImportError:
        log("DEPLOYING locally")
        pg_password = os.environ.get("PG_PASSWORD")
        url_separator = ":"
    if pg_password is None:
        raise RuntimeError("Could not obtain postgres password")
    log("OBTAINED postgres password")

    # 2. Without the CloudSQL host name we're toast, also
    db_host = os.environ.get("DB_HOST")
    if db_host is None:
        raise RuntimeError("Could not obtain CloudSQL host name")
    log(f"DB_HOST is {db_host}")

    # 3. Set the prefix for the prodigy servers' URLs
    config = yaml.safe_load(open("src/config.yaml"))
    ProdigyServer.set_url_prefix(config["prodigy"]["prefixes"][ENVIRONMENT])
    log(f"PRODIGY prefix is {ProdigyServer.prefix}")

    # 4. Get a list of registered annotator names so we can take advantage of
    # Prodigy's "/?session=" mechanism
    global known_annotators
    known_annotators = config["annotators"]

    # 5. Get the task definitions from GCS
    global task_definitions
    task_definitions = await load_tasks()

    log(f"TASK DEFINITIONS loaded for {list(task_definitions.keys())}")

    # 6. Prepare the Prodigy servers' environment
    modify_config("src/prodigy.json", db_host, pg_password, {})
    os.environ["PRODIGY_ALLOWED_SESSIONS"] = ",".join(known_annotators)
    os.environ["PRODIGY_CONFIG"] = f'{os.getcwd()}/src/prodigy.json'
    os.environ["PRODIGY_LOGGING"] = "basic"

    # 7. Initialize the DB monitoring connection
    global sql_connector
    prodigy_cfg = json.load(open(os.getenv("PRODIGY_CONFIG")))
    sql_connector = SQLConnector(prodigy_cfg["db_settings"]["postgresql"])

    # 8. Read current task info and add them to the active_tasks dict
    log(f"READING in existing tasks from {db_host} (if any)")

    # For storing the original start times of the saved tasks
    start_times = {}
    for record in sql_connector.query_tasks():
        # record is a (task_url, annotator, task_name, created) tuple
        task_name = record[2]
        if task_name not in active_tasks:
            # The task_url (first field in the returned record) looks like:
            # "https://ml-gcp-dev.poshdevelopment.com/prodigy-server-9093"; we
            # split the port number off the end and save it, so users can reuse
            # the address they were given when they first signed up for the task
            _, port_num = record[0].rsplit(url_separator, 1)
            start_times[task_name] = record[3]
            active_tasks[task_name] = create_server(
                task_name, port_num=int(port_num)
            )
        add_annotator_to_task(record[1], task_name)
    log(f"RECREATED {len(active_tasks)} prodigy servers")

    # 9. Record the starting date and time (purely diagnostic)
    global running_since
    running_since = timestamp()

    # 10. Start the tasks we collected above
    for task_name, task in active_tasks.items():
        task.start(start_time=start_times[task_name])
        log(f"RESTARTED server for {task_name}")


@app.get("/api/v1/info", status_code=200)
async def info():
    """
    Get information about the app
    """
    return {
        "API version": app.version,
        "running since": running_since,
        "known annotators": known_annotators,
        "active tasks": await running_tasks()
    }


@app.get("/api/v1/prodigy_config")
async def prodigy_config():
    return json.load(open(os.getenv("PRODIGY_CONFIG")))


@app.get("/api/v1/available_tasks", status_code=200)
async def avaliable_tasks():
    """
    Get a list of the annotation tasks currently running
    """
    return {key: value for key, value in task_definitions.items()}


@app.get("/api/v1/running_tasks", status_code=200)
async def running_tasks():
    """
    Get a summary of the running annotation tasks, mapped to their names
    """
    for task_name, task in active_tasks.items():
        print(f"RUNNING: {task_name} -> {task}")
    return {
        task_name: task.summary() for task_name, task in active_tasks.items()
        if task.is_running()
    }


@app.get("/api/v1/tasks/{task_name}/{annotator_name}", status_code=200)
async def start_task(task_name: str, annotator_name: str):
    """
    Get the Prodigy server URL for an annotation task, either newly created or
    already running
    """
    global active_tasks
    try:
        assert annotator_name in known_annotators
    except AssertionError:
        raise HTTPException(
            detail=f"Annotator '{annotator_name}' is not registered",
            status_code=404
        )
    try:
        if task_name not in active_tasks:
            active_tasks[task_name] = create_server(task_name)
    except ValueError:
        raise HTTPException(
            detail=f"No space left: {len(active_tasks)} tasks running already",
            status_code=404
        )

    # Adding an annotator to a task is idempotent
    add_annotator_to_task(annotator_name, task_name)

    task_url = active_tasks[task_name].url
    # Log the newly created task info in our SQL table
    logger.info(f"ADDING '{annotator_name}','{task_name}' to state table")
    sql_connector.insert_task((task_url, annotator_name, task_name))
    active_tasks[task_name].start()
    return {"URL": f"{task_url}/?session={annotator_name}"}


@app.delete("/api/v1/tasks/{task_name}", status_code=200)
async def terminate(task_name: str):
    global active_tasks
    try:
        assert task_name in active_tasks
    except AssertionError:
        raise HTTPException(
            detail=f"No task running for '{task_name}'", status_code=404
        )
    proc = active_tasks.pop(task_name)
    proc.terminate()
    logger.info(f"TERMINATED server for '{task_name}'")
    sql_connector.delete_task((task_name,))
    logger.info(f"DELETED entries for '{task_name}' from state table")


@app.get("/api/v1/load_tasks", status_code=200)
async def load_tasks():
    """
    Get new and updated task definitions from remote storage
    """
    gcs_client = storage.Client()

    for blob in gcs_client.list_blobs(GCS_ROOT, prefix=GCS_DEFINITIONS_FOLDER):
        # The method includes objects from the root and all its subdirectories,
        # recursively; a size of 0 means the blob is a directory rather than a
        # file
        if blob.size > 0:
            content = blob.download_as_string()
            try:
                config = yaml.load(content, Loader=yaml.SafeLoader)
                name = config.pop("name")
                task_definitions[name] = TaskDefinition(**config)
            except yaml.parser.ParserError:
                # Not a valid task configuration, so likely a data file; print
                # diagnostic information to the log
                lines = content.split(b'\n')
                log(f"{blob.name} contains {len(lines)} lines", level="WARNING")
            except KeyError:
                # Not a task-definition file; move on to the next one
                pass

    return task_definitions

server.py

import logging
import multiprocessing as mp
import os
import time
from datetime import datetime
from threading import Lock
from typing import Dict, List

import prodigy
from google.cloud import storage

# The import is used implicitly by the "run_server" method below
from annotator import GCS_FILES_FOLDER, GCS_ROOT, recipes  # noqa
from annotator.utils import timestamp

logger = logging.getLogger("hypercorn.access")
gcs_client = storage.Client()


class TaskDefinition(dict):
    """
    An annotation task consists of a named recipe and a dataset
    """
    _required_keys = ("recipe", "dataset", "filepath")
    _optional_keys = ("spacy_model", "labels", "label_field", "choice_field")

    def __init__(self, **kwargs):
        """
        Make sure that all the required key/values are present and that all the
        keys are known (either required or optional)
        """
        for key in self._required_keys:
            assert key in kwargs
        for key in list(kwargs):
            assert key in self._required_keys + self._optional_keys, \
                f"{key} is not a recognized attribute"
        self.update({k: v for k, v in kwargs.items()})

    def convert_to_args(self) -> str:
        """
        Convert the task's attributes to a string of arguments that can be
        passed to the prodigy serve command

        :return: a string with the command-line arguments
        """
        # The filepath attribute is the GCS location, which consists of two
        # parts: the first two characters identify the subfolder, the remaining
        # 30 are the filename. We discard the subfolder and prepend "data" to
        # the remainder to obtain the target path (i.e., where the GCS download
        # will store the file), then use it (rather than the original filepath)
        # as the source specification (third argument) for the prodigy command
        bucket = gcs_client.bucket(GCS_ROOT)
        source_path = os.path.join(
            GCS_FILES_FOLDER, self["filepath"][:2], self['filepath'][2:]
        )
        target_path = f"data/{self['filepath'][2:]}.jsonl"
        blob = bucket.blob(source_path)
        blob.download_to_filename(target_path)
        logger.info(f"DOWNLOADED [GCS] '{source_path}' to '{target_path}'")

        command_args = [self["recipe"], self["dataset"], target_path]
        if "spacy_model" in self:
            command_args.insert(2, self["spacy_model"])
        if "labels" in self:
            command_args.append(f"--label {','.join(self['labels'])}")
        if "label_field" in self:
            command_args.append(f"-l {self['label_field']}")
        if "choice_field" in self:
            command_args.append(f"-c {self['choice_field']}")
        logger.info(f"CREATING task with '{command_args}")
        return " ".join(command_args)


class ProdigyServer:
    """
    Stores information about the Prodigy web server (such as the port number)
    and manages the start and termination of the actual subprocess.
    """
    @classmethod
    def set_url_prefix(cls, prefix: str) -> None:
        """
        Set the URL prefix to be used by all servers

        :param prefix: a string, obtained from the config file, and dependent
        on the environment where the app is running
        """
        cls.prefix = prefix

    def __init__(self, port_num: int):
        """
        Create a new server with the given port number
        """
        self.port_num = port_num
        self._proc = None
        self.start_time = None

    @property
    def url(self):
        return self.prefix + str(self.port_num)

    def is_available(self):
        return self._proc is None or not self._proc.is_alive()

    def is_running(self):
        return self._proc.is_alive()

    def start(
        self, taskdef: TaskDefinition, start_time: datetime = None,
        wait_time: int = 10
    ):
        """
        Start the server with the attributes from the task definition and give
        it a bit of time to settle down

        :param taskdef: the TaskDefinition (with the recipe name, filepath, and
        other attributes required for the server command)
        :param start_time: date and time the task first started; will not be
        None if the task was recreated from the active_tasks table at startup
        :param wait_time: the number of seconds to wait to give the server a
        chance to initialize properly (default: 10)
        """
        # Start the Prodigy webserver and give it 10 seconds before returning
        # (the .is_alive() method returns True immediately, so we cannot wait
        # for that)
        command_args = taskdef.convert_to_args()
        self._proc = mp.Process(
            target=run_server, args=(command_args, self.port_num,)
        )
        self._proc.daemon = False
        self._proc.start()
        time.sleep(wait_time)
        self.start_time = start_time or timestamp()

    def terminate(self):
        """
        If the server process is currently running, terminate it
        """
        if self._proc.is_alive():
            self._proc.terminate()
            while self._proc.is_alive():
                time.sleep(.1)
        self._proc = None
        self.start_time = None


class AnnotationTask:
    """
    Stores the task definition and manages the multiprocessing.Process for an
    annotation task. Also keeps a list of annotators working on the task.
    """
    # A finite list of available port numbers; the Helm chart assign explicit
    # addresses to each of them, so they cannot be random
    _reserved_ports: List[int] = [port_num for port_num in range(9091, 9101)]

    # For use as a context manager so only one thread can obtain/return a port
    # number at any one time
    _lock = Lock()

    @classmethod
    def _claim_port(cls, port_num: int = None) -> int:
        """
        Claim a port by number, or get the next available one; raises an error
        if we've run out of port numbers

        :param port_num: the port number to assign to the server; if not given,
        select the next available one from the list of reserved port numbers
        :return: an integer value between 9091 and 9100 (inclusive)
        :raises: ValueError if no port numbers are available
        """
        with cls._lock:
            if cls._reserved_ports:
                if port_num is not None:
                    cls._reserved_ports.remove(port_num)
                else:
                    port_num = cls._reserved_ports.pop(0)
                return port_num
        raise ValueError("All reserved port numbers are taken")

    @classmethod
    def available_ports(cls) -> List[int]:
        return cls._reserved_ports

    def __init__(self, taskdef: TaskDefinition, port_num: int = None):
        """
        Start a new Prodigy server with the task described in the definition

        :param taskdef: an object with all the attributes needed to start the
        process
        :param port_num: the port number for the task; if not specified, select
        the next available one
        """
        port_num = AnnotationTask._claim_port(port_num=port_num)
        self._task_def = taskdef
        self._annotators = set()
        self._server = ProdigyServer(port_num)

    def start(self, start_time: datetime = None, wait_time: int = 10):
        """
        :param start_time: date and time the task first started; will not be
        None if the task was recreated from the active_tasks table at startup
        :param wait_time: the number of seconds to wait to give the server a
        chance to initialize properly (default: 10)
        """
        self._server.start(
            self._task_def, start_time=start_time, wait_time=wait_time
        )

    @property
    def url(self) -> str:
        """
        Return the externally accessible address for the server handling the
        current task

        :return: a URL (string)
        """
        return self._server.url

    def is_running(self):
        return self._server.is_running()

    def add_annotator(self, annotator_name: str) -> None:
        self._annotators.add(annotator_name)

    def terminate(self):
        """
        Terminate the Prodigy server process; as a side effect, the port number
        assigned to the server is now available for another task
        """
        with self._lock:
            self._reserved_ports.append(self._server.port_num)
            self._server.terminate()

    def summary(self) -> Dict:
        """
        Return useful information about the task
        """
        return {
            "task": self._task_def,
            "url": self._server.url,
            "annotators": list(self._annotators),
            "started_at": self._server.start_time
        }


def run_server(command_args: str, port: int):
    prodigy.serve(command_args, port=port, host="0.0.0.0")

This is a detail, but I noticed you're using app.on_event("startup"). While I think this should fine, it is something that the FastAPI docs no longer recommend. Figured I'd at least mention it.

That said. My initial gut feeling is to add some more logging. I'm not seeing anything obviously strange in this code, so more telemetry might give us a next hint. Especially because we don't have a locally reproducible case yet. The prodigy.log function can be used for this (as described here. Is this what you're using under the hood in from annotator.utils import log?

In particular, it might be good to log the port number in ProdigyServer.start, just to be able to confirm which ports actually end up getting used. It'd be especially useful if some user information was logged as well. I also think it cannot hurt to log the command_args that have been generated.

One concern that comes to mind is related to the load_tasks endpoint. I see that it's defined as an async function, but is blob.download_as_string not a blocking function? I could be wrong, but I recall many cloud providers having Python APIs that are not async. And I can certainly imagine a big file download to block tasks. Might be good to log the start/end times of that function if large files are being pulled in.

Another detail if you're worried about the code exiting and breaking, you can also the Pythons native atexit module to log things just before it fully quits. Demo:

import atexit 

a = 1
b = 2

atexit.register(print, "This is a log printed at exit", locals())

In your code I would replace the print with your preferred logger but this will also allow you to log any relevant locals on exit. As always, be mindful that you may be printing secrets too if you're not careful and you probably don't want these to appear in your logs.

Finally, if this doesn't yield anything useful, might it be possible to run an experiment on dummy data. Something like a JSONL file that really just has text like "this is example #{n}" just so we might lure out the issue? I'm certainly willing to jump on a call to dive into logs some more, but it would help very much if we could see the error happening live. That way, we might get to something that is reproducible.

Then again, we have refactored a large chunk of the codebase that handles task routing in v1.12, so the solution might also be to wait for that version.

Thanks for investigating and providing feedback. Here is more information:

  • We are logging the following: port number, command args, annotator info
  • We have an endpoint "/running_tasks" in the FastAPI app, backed by a database table, that shows the information in real time.
  • We have always found it to be accurate in showing port numbers, annotation tasks and annotators subscribed to the tasks.
  • The utils.log() method is simply a print statement with a timestamp that we use during initialization of the app, when the default logger is not yet available.
  • The blob.download_as_string() function is certainly blocking, but that is not a problem inside an async function.
  • We also log all the information and have not observed any issues with downloading the data from GCS.
  • Here's a sample from the log: it shows what happens when a new annotation task is created, and all seems to be working as expected:
2023-04-19T19:23:27.275855280Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19T19:23:27] [INFO] CREATING new server for 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.275911207Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19T19:23:27] [INFO] ADDING 'cheyanne' to 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.275979916Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] ADDING 'cheyanne','eup_low_scoring_data_v1' to state table
2023-04-19T19:23:27.478630563Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] DOWNLOADED [GCS] 'raw/3a/9533791731a61a22a40493ff1f9072' to 'data/9533791731a61a22a40493ff1f9072.jsonl'
2023-04-19T19:23:27.478691658Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] CREATING task with '['eup-corpus-validation-with-options', 'eup_low_scoring_data_v1', 'data/9533791731a61a22a40493ff1f9072.jsonl']
2023-04-19T19:23:27.492342154Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: RECIPE: Calling recipe 'eup-corpus-validation-with-options'
2023-04-19T19:23:27.547289563Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONFIG: Using config from PRODIGY_CONFIG env var
2023-04-19T19:23:27.548088398Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Validating components returned by recipe
2023-04-19T19:23:27.549925541Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONTROLLER: Initialising from recipe
2023-04-19T19:23:27.550221218Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Creating validator for view ID 'blocks'
2023-04-19T19:23:27.550642601Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Validating Prodigy and recipe config
2023-04-19T19:23:27.552024010Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONFIG: Using config from PRODIGY_CONFIG env var
2023-04-19T19:23:27.553338541Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Initializing database PostgreSQL
2023-04-19T19:23:27.554768238Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:27 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:27.582554270Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Connecting to database PostgreSQL
2023-04-19T19:23:27.588312571Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Creating dataset 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.593729469Z [resource.labels.containerName: nsgi-annotation-service] Added dataset eup_low_scoring_data_v1 to database PostgreSQL.
2023-04-19T19:23:27.599291256Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Creating dataset '2023-04-19_19-23-27'
2023-04-19T19:23:27.633258290Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: FEED: Initializing from controller
2023-04-19T19:23:27.634140523Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CORS: initialized with wildcard "*" CORS origins
2023-04-19T19:23:27.634800494Z [resource.labels.containerName: nsgi-annotation-service] {}
2023-04-19T19:23:27.634852116Z [resource.labels.containerName: nsgi-annotation-service] ✨ Starting the web server at http://0.0.0.0:9091 ...
2023-04-19T19:23:27.634863474Z [resource.labels.containerName: nsgi-annotation-service] Open the app in your browser and start annotating!
2023-04-19T19:23:27.634869634Z [resource.labels.containerName: nsgi-annotation-service] {}
2023-04-19T19:23:27.647545281Z [resource.labels.containerName: nsgi-annotation-service] INFO: Started server process [163]
2023-04-19T19:23:27.647593665Z [resource.labels.containerName: nsgi-annotation-service] INFO: Waiting for application startup.
2023-04-19T19:23:27.648017755Z [resource.labels.containerName: nsgi-annotation-service] INFO: Application startup complete.
2023-04-19T19:23:27.648582794Z [resource.labels.containerName: nsgi-annotation-service] INFO: Uvicorn running on http://0.0.0.0:9091 (Press CTRL+C to quit)
[...]
2023-04-19T19:23:46.890964092Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: POST: /get_session_questions
2023-04-19T19:23:46.891658628Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:46.913629979Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: CONTROLLER: Getting batch of questions for session: eup_low_scoring_data_v1-cheyanne
2023-04-19T19:23:46.914257752Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: Finding next batch of questions in stream
2023-04-19T19:23:46.920680018Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: re-adding open tasks to stream
2023-04-19T19:23:46.922544217Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: batch of questions requested for session eup_low_scoring_data_v1-cheyanne: 10
2023-04-19T19:23:46.923044861Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: RESPONSE: /get_session_questions (10 examples)
2023-04-19T19:23:46.923221252Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 Client closed local connection on 127.0.0.1:5432
2023-04-19T19:23:46.927698778Z [resource.labels.containerName: nsgi-annotation-service] INFO: 10.56.1.24:53074 - "POST /get_session_questions HTTP/1.1" 200 OK

The two entries of concern are possibly those coming from the cloudsql-proxy:

2023-04-19T19:23:46.891658628Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:46.923221252Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 Client closed local connection on 127.0.0.1:5432

We assume that the "Client" in the entry above is the prodigy web server; our app is not opening or closing connections, or using a connection pool. We see these entries on every interaction between the prodigy server handling an annotation task and the database: getting a fresh batch of data for a task, or recording (saving) an answer to a question to the database, as in this example:

2023-04-19T22:37:59.182586922Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:37:59[0m: CONTROLLER: Added 1 answers to dataset 'stt_error_validation' in database PostgreSQL
2023-04-19T22:37:59.183282062Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:37:59[0m: RESPONSE: /give_answers
2023-04-19T22:37:59.183473098Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:37:59 Client closed local connection on 127.0.0.1:5432
2023-04-19T22:37:59.184950692Z [resource.labels.containerName: nsgi-annotation-service] INFO: 10.56.5.11:33424 - "POST /give_answers HTTP/1.1" 200 OK
2023-04-19T22:38:37.377031033Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:38:37[0m: POST: /give_answers (received 1, session ID 'stt_error_validation-joel')
2023-04-19T22:38:37.377516031Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:38:37 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T22:38:37.397329881Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:38:37[0m: CONTROLLER: Receiving 1 answers
2023-04-19T22:38:37.397445116Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:38:37 Client closed local connection on 127.0.0.1:5432

Are you sure about the blocking there?

Here's a demo app, in a file called app.py that I just ran locally.

import time
import asyncio
from fastapi import FastAPI

app = FastAPI()


@app.get("/sleep_slow")
async def sleep_slow():
    time.sleep(5)
    return {"status": "done"}

@app.get("/sleep_fast")
async def sleep_fast():
    await asyncio.sleep(1)
    return {"status": "done"}

Note that one async function uses the blocking time.sleep while the other uses the non-blocking asyncio.sleep. You can now run this locally via;

uvicorn app:app

You can now point the browser to /sleep_slow and in another tab to /sleep_fast. Notice that while /sleep_slow is still waiting ... /sleep_fast doesn't return. It's because time.sleep is blocking, despite being in a async function.

Is this not a concern for the blob.download_as_string?

Thanks for investigating! Some feedback from our team. Please let me know if you have any questions!

  • It is true that each of the blob.download_as_string() calls is a blocking call, but eventually each call finishes (typically in less than a second) and when they are all finished, the load_tasks() method itself finishes and returns.
  • In your example, if we call the sleep_fast and sleep_slow endpoints simultaneously, sleep_fast will return after sleep_slow, even though it was asked to sleep 4 fewer seconds than sleep_slow, because the call to time.sleep() in sleep_slow blocks sleep_fast from returning.
  • The same is true of our annotation service: until load_tasks() finishes, nothing else can happen. However, because we call load_tasks() just once at initialization time, it does not interfere with anything else, because nothing else is happening -- nothing else can happen until initialization is complete.
  • The initialization function, even though declared async, is by design a sequence of blocking calls -- such as obtaining the database password in step 1 and loading the prodigy.json file in step 3 -- that must succeed in order for us to have a functioning service.
  • When load_tasks() returns in step 5, we have all our task definitions in memory.
  • In step 8, we use the definitions -- each of which contains the path to a data file stored in GCS -- to recreate the individual prodigy webservers for annotation tasks that were running before we restarted the service; note that we do not start any of these prodigy webservers as we recreate them; that would indeed block the whole initialization routine from finishing. Instead, we restart each server as a separate multiprocessing.Process in step 10.
  • When all prodigy servers have started successfully -- and our logs show that they do -- the service is ready. The annotators then connect to the prodigy webserver for their task via a dedicated port number without interacting with the FastAPI app at all. Only when they have finished one task do they use the /api/v1/tasks endpoint to select a new task, which will run its own prodigy webserver in a separate process associated with its own port number.
  • Our problem is not that we can't load the datasets or create servers for each task, but that, on occasion, as annotators attempt to save their work to the database, the process fails and some annotations are lost. Possibly this happens because the prodigy webserver for the task cannot connect to the PostgresSQL database.
  • We never have more than 10 prodigy servers running simultaneously and our max_connections setting is 100, so it's a bit of a mystery as to why that would happen.
  • We have so far not seen any errors in the CloudSQL logs. Possibly it's just a temporary glitch in the internal GKE network.

I have one additional update: we have a scenario where two annotators are annotating the same set, and they each got the save error on the same document while annotating days apart. This has happened before, so it's not new, but I cannot identify anything about the documents that this error occurs on that is unique. Also, pressing ignore or reject doesn't work to move past the document with the error. The annotator will successfully save a document, and the document with the error will pop up again.

I've noticed one trend in the content. Tasks where we have a link to full conversations do not encounter this error, but single utterances do, and they seem to contain sql protected keywords: union, select, having, etc.

Do you have an example json representation of such a task?

Also, can you confirm that this never happens when a SQL protected keyword is not around?

I cannot definitively confirm that this has never happened when a SQL protected keyword is absent, but I collected sample utterances that have caused errors and they all contain SQL protected keywords. I'm currently testing this dataset with our annotators and will post the outcome. I was still not able to reproduce the error locally.

1 Like

Just a quick note that we resolved this issue! It was a WAF issue where documents containing SQL protected keywords caused save errors and would not save to the database.

2 Likes

Good to know! Thanks for reporting back!

1 Like