hi @cheyanneb!
v1.11.8a4 is the latest. We're working hard to release v1.12 soon so now could be a great time to prepare for v1.12. It includes migration script that will be required. Let us know of any issues you find!
hi @cheyanneb!
v1.11.8a4 is the latest. We're working hard to release v1.12 soon so now could be a great time to prepare for v1.12. It includes migration script that will be required. Let us know of any issues you find!
Thanks! Is it possible get v1.11.8a4 wheels?
The wheels aren't available openly but we've sent you a follow up message with them. Anyone else who has questions can send me a direct message and I can help them get the wheels if they're interested.
We are still experiencing this error with v.1.11.11. Any updates on this? Thanks!
Bummer to hear this issue hasn't gone away.
I guess the best thing for me to do now is to try if I am able to reproduce this locally. With that in mind, is it possible to share your Dockerfile
? Also, is this running a custom recipe? If so, is it possible to share that as well?
I'm likely not going to be able to mock the proxy and the CloudSQL database, but I might be able to reproduce something if I know more about the recipe/docker setup.
Perhaps a final question, does this issue perists across all annotation tasks that you give it? As in, are there recipes/situations for which this issue does not occur? Does it also occur when a single person is annotating?
I will see if I can share those files. As for the final question:
ignore
, I could set up a new task to handle these missing docs, but it's usually just 1 or 2 and we can do them manually.Here is our Dockerfile.local
. Its purpose is to be able to run a docker container locally, without relying on any Gitlab or Kubernetes dependencies. To make it work (since you not have access to our base image), substitute FROM python:3.9-slim
on the second line.
# Alternative Dockerfile that does not depend on AWS
FROM gcr.io/posh-containers/nsgi-base-image:latest
LABEL maintainer="Posh NLP Team <email.address>"
ENV LANG=C.UTF-8
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PRODIGY_WHEEL=prodigy-1.11.7-cp39-cp39-linux_x86_64.whl
WORKDIR /workspace
# This is where the raw data files (downloaded from s3) will be stored
RUN mkdir /workspace/data
# Install prodigy and its dependencies
COPY wheels/$PRODIGY_WHEEL /workspace
RUN pip install $PRODIGY_WHEEL psycopg2-binary hypercorn pyyaml \
google-cloud-storage \
&& rm -rf /root/.cache/pip \
&& rm -f $PRODIGY_WHEEL
# Install a trained pipeline (spaCy language model) for NER recipes
RUN python -m spacy download en_core_web_md
COPY setup.py /workspace
COPY src/ /workspace/src
RUN pip install .
# The first directive exposes the API for "logging in" (so to speak) and
# "registering" as an annotator; the next directive exposes the ports for
# connecting to the annotator-specific web server running inside the container
EXPOSE 5000
EXPOSE 9091-9100
CMD ["hypercorn", \
"--bind", "0.0.0.0:5000", \
"--access-logfile", "-", \
"--error-logfile", "-", \
"annotator.api.app:app"]
Is it possible for you to share what you're doing in this file? I imagine you're doing some custom code there to deal with the different ports for different annotators.
I'm checking on what I am able to share. But yes, this is custom code to deal with different ports for different annotators.
Here is some additional code that may help you reproduce the issue:
app.py
import json
import logging
import os
from typing import Dict, List
import yaml
from fastapi import FastAPI, HTTPException
from google.cloud import storage
from annotator import (DESCRIPTION, GCS_DEFINITIONS_FOLDER, GCS_ROOT,
__version__)
from annotator.server import AnnotationTask, ProdigyServer, TaskDefinition
from annotator.sql_commands import SQLConnector
from annotator.utils import log, modify_config, timestamp
ENVIRONMENT = os.getenv("ENVIRONMENT", "local")
logger = logging.getLogger("hypercorn.access")
app = FastAPI(
title="Posh multi-task annotation service",
description=DESCRIPTION, version=__version__
)
known_annotators: List[str] = None
task_definitions: Dict[str, TaskDefinition] = {}
active_tasks: Dict[str, AnnotationTask] = {}
sql_connector = None
def create_server(task_name: str, port_num: int = None):
"""
Get the task definition from the existing task definition list and create a
Prodigy web server for it.
"""
task_def = TaskDefinition(**task_definitions.get(task_name))
verb = "RECREATING" if port_num is not None else "CREATING new"
log(f"{verb} server for '{task_name}'")
return AnnotationTask(task_def, port_num=port_num)
def add_annotator_to_task(annotator_name: str, task_name: str):
"""
Add annotator to a task. Modifies global Dict "active_tasks".
"""
global active_tasks
log(f"ADDING '{annotator_name}' to '{task_name}'")
active_tasks[task_name].add_annotator(annotator_name)
@app.on_event("startup")
async def initialize():
"""
Perform the required initialization steps or raise a RuntimeError if any of
the required resouces (password, db host) cannot be obtained. Note that we
use log() to report progress because the logger is not available until the
app is properly initialized.
"""
global active_tasks
url_separator = "-"
# 1. First, we must get the postgres password, or we're toast
try:
# In a dev environment, the Google Secrets Client is not available, so
# we get the password directly from the environment
from annotator.secrets_manager import SecretsManager
log("DEPLOYING from Gitlab pipeline")
pg_password = SecretsManager.get_secret("PRODIGY_PASS")
except ImportError:
log("DEPLOYING locally")
pg_password = os.environ.get("PG_PASSWORD")
url_separator = ":"
if pg_password is None:
raise RuntimeError("Could not obtain postgres password")
log("OBTAINED postgres password")
# 2. Without the CloudSQL host name we're toast, also
db_host = os.environ.get("DB_HOST")
if db_host is None:
raise RuntimeError("Could not obtain CloudSQL host name")
log(f"DB_HOST is {db_host}")
# 3. Set the prefix for the prodigy servers' URLs
config = yaml.safe_load(open("src/config.yaml"))
ProdigyServer.set_url_prefix(config["prodigy"]["prefixes"][ENVIRONMENT])
log(f"PRODIGY prefix is {ProdigyServer.prefix}")
# 4. Get a list of registered annotator names so we can take advantage of
# Prodigy's "/?session=" mechanism
global known_annotators
known_annotators = config["annotators"]
# 5. Get the task definitions from GCS
global task_definitions
task_definitions = await load_tasks()
log(f"TASK DEFINITIONS loaded for {list(task_definitions.keys())}")
# 6. Prepare the Prodigy servers' environment
modify_config("src/prodigy.json", db_host, pg_password, {})
os.environ["PRODIGY_ALLOWED_SESSIONS"] = ",".join(known_annotators)
os.environ["PRODIGY_CONFIG"] = f'{os.getcwd()}/src/prodigy.json'
os.environ["PRODIGY_LOGGING"] = "basic"
# 7. Initialize the DB monitoring connection
global sql_connector
prodigy_cfg = json.load(open(os.getenv("PRODIGY_CONFIG")))
sql_connector = SQLConnector(prodigy_cfg["db_settings"]["postgresql"])
# 8. Read current task info and add them to the active_tasks dict
log(f"READING in existing tasks from {db_host} (if any)")
# For storing the original start times of the saved tasks
start_times = {}
for record in sql_connector.query_tasks():
# record is a (task_url, annotator, task_name, created) tuple
task_name = record[2]
if task_name not in active_tasks:
# The task_url (first field in the returned record) looks like:
# "https://ml-gcp-dev.poshdevelopment.com/prodigy-server-9093"; we
# split the port number off the end and save it, so users can reuse
# the address they were given when they first signed up for the task
_, port_num = record[0].rsplit(url_separator, 1)
start_times[task_name] = record[3]
active_tasks[task_name] = create_server(
task_name, port_num=int(port_num)
)
add_annotator_to_task(record[1], task_name)
log(f"RECREATED {len(active_tasks)} prodigy servers")
# 9. Record the starting date and time (purely diagnostic)
global running_since
running_since = timestamp()
# 10. Start the tasks we collected above
for task_name, task in active_tasks.items():
task.start(start_time=start_times[task_name])
log(f"RESTARTED server for {task_name}")
@app.get("/api/v1/info", status_code=200)
async def info():
"""
Get information about the app
"""
return {
"API version": app.version,
"running since": running_since,
"known annotators": known_annotators,
"active tasks": await running_tasks()
}
@app.get("/api/v1/prodigy_config")
async def prodigy_config():
return json.load(open(os.getenv("PRODIGY_CONFIG")))
@app.get("/api/v1/available_tasks", status_code=200)
async def avaliable_tasks():
"""
Get a list of the annotation tasks currently running
"""
return {key: value for key, value in task_definitions.items()}
@app.get("/api/v1/running_tasks", status_code=200)
async def running_tasks():
"""
Get a summary of the running annotation tasks, mapped to their names
"""
for task_name, task in active_tasks.items():
print(f"RUNNING: {task_name} -> {task}")
return {
task_name: task.summary() for task_name, task in active_tasks.items()
if task.is_running()
}
@app.get("/api/v1/tasks/{task_name}/{annotator_name}", status_code=200)
async def start_task(task_name: str, annotator_name: str):
"""
Get the Prodigy server URL for an annotation task, either newly created or
already running
"""
global active_tasks
try:
assert annotator_name in known_annotators
except AssertionError:
raise HTTPException(
detail=f"Annotator '{annotator_name}' is not registered",
status_code=404
)
try:
if task_name not in active_tasks:
active_tasks[task_name] = create_server(task_name)
except ValueError:
raise HTTPException(
detail=f"No space left: {len(active_tasks)} tasks running already",
status_code=404
)
# Adding an annotator to a task is idempotent
add_annotator_to_task(annotator_name, task_name)
task_url = active_tasks[task_name].url
# Log the newly created task info in our SQL table
logger.info(f"ADDING '{annotator_name}','{task_name}' to state table")
sql_connector.insert_task((task_url, annotator_name, task_name))
active_tasks[task_name].start()
return {"URL": f"{task_url}/?session={annotator_name}"}
@app.delete("/api/v1/tasks/{task_name}", status_code=200)
async def terminate(task_name: str):
global active_tasks
try:
assert task_name in active_tasks
except AssertionError:
raise HTTPException(
detail=f"No task running for '{task_name}'", status_code=404
)
proc = active_tasks.pop(task_name)
proc.terminate()
logger.info(f"TERMINATED server for '{task_name}'")
sql_connector.delete_task((task_name,))
logger.info(f"DELETED entries for '{task_name}' from state table")
@app.get("/api/v1/load_tasks", status_code=200)
async def load_tasks():
"""
Get new and updated task definitions from remote storage
"""
gcs_client = storage.Client()
for blob in gcs_client.list_blobs(GCS_ROOT, prefix=GCS_DEFINITIONS_FOLDER):
# The method includes objects from the root and all its subdirectories,
# recursively; a size of 0 means the blob is a directory rather than a
# file
if blob.size > 0:
content = blob.download_as_string()
try:
config = yaml.load(content, Loader=yaml.SafeLoader)
name = config.pop("name")
task_definitions[name] = TaskDefinition(**config)
except yaml.parser.ParserError:
# Not a valid task configuration, so likely a data file; print
# diagnostic information to the log
lines = content.split(b'\n')
log(f"{blob.name} contains {len(lines)} lines", level="WARNING")
except KeyError:
# Not a task-definition file; move on to the next one
pass
return task_definitions
server.py
import logging
import multiprocessing as mp
import os
import time
from datetime import datetime
from threading import Lock
from typing import Dict, List
import prodigy
from google.cloud import storage
# The import is used implicitly by the "run_server" method below
from annotator import GCS_FILES_FOLDER, GCS_ROOT, recipes # noqa
from annotator.utils import timestamp
logger = logging.getLogger("hypercorn.access")
gcs_client = storage.Client()
class TaskDefinition(dict):
"""
An annotation task consists of a named recipe and a dataset
"""
_required_keys = ("recipe", "dataset", "filepath")
_optional_keys = ("spacy_model", "labels", "label_field", "choice_field")
def __init__(self, **kwargs):
"""
Make sure that all the required key/values are present and that all the
keys are known (either required or optional)
"""
for key in self._required_keys:
assert key in kwargs
for key in list(kwargs):
assert key in self._required_keys + self._optional_keys, \
f"{key} is not a recognized attribute"
self.update({k: v for k, v in kwargs.items()})
def convert_to_args(self) -> str:
"""
Convert the task's attributes to a string of arguments that can be
passed to the prodigy serve command
:return: a string with the command-line arguments
"""
# The filepath attribute is the GCS location, which consists of two
# parts: the first two characters identify the subfolder, the remaining
# 30 are the filename. We discard the subfolder and prepend "data" to
# the remainder to obtain the target path (i.e., where the GCS download
# will store the file), then use it (rather than the original filepath)
# as the source specification (third argument) for the prodigy command
bucket = gcs_client.bucket(GCS_ROOT)
source_path = os.path.join(
GCS_FILES_FOLDER, self["filepath"][:2], self['filepath'][2:]
)
target_path = f"data/{self['filepath'][2:]}.jsonl"
blob = bucket.blob(source_path)
blob.download_to_filename(target_path)
logger.info(f"DOWNLOADED [GCS] '{source_path}' to '{target_path}'")
command_args = [self["recipe"], self["dataset"], target_path]
if "spacy_model" in self:
command_args.insert(2, self["spacy_model"])
if "labels" in self:
command_args.append(f"--label {','.join(self['labels'])}")
if "label_field" in self:
command_args.append(f"-l {self['label_field']}")
if "choice_field" in self:
command_args.append(f"-c {self['choice_field']}")
logger.info(f"CREATING task with '{command_args}")
return " ".join(command_args)
class ProdigyServer:
"""
Stores information about the Prodigy web server (such as the port number)
and manages the start and termination of the actual subprocess.
"""
@classmethod
def set_url_prefix(cls, prefix: str) -> None:
"""
Set the URL prefix to be used by all servers
:param prefix: a string, obtained from the config file, and dependent
on the environment where the app is running
"""
cls.prefix = prefix
def __init__(self, port_num: int):
"""
Create a new server with the given port number
"""
self.port_num = port_num
self._proc = None
self.start_time = None
@property
def url(self):
return self.prefix + str(self.port_num)
def is_available(self):
return self._proc is None or not self._proc.is_alive()
def is_running(self):
return self._proc.is_alive()
def start(
self, taskdef: TaskDefinition, start_time: datetime = None,
wait_time: int = 10
):
"""
Start the server with the attributes from the task definition and give
it a bit of time to settle down
:param taskdef: the TaskDefinition (with the recipe name, filepath, and
other attributes required for the server command)
:param start_time: date and time the task first started; will not be
None if the task was recreated from the active_tasks table at startup
:param wait_time: the number of seconds to wait to give the server a
chance to initialize properly (default: 10)
"""
# Start the Prodigy webserver and give it 10 seconds before returning
# (the .is_alive() method returns True immediately, so we cannot wait
# for that)
command_args = taskdef.convert_to_args()
self._proc = mp.Process(
target=run_server, args=(command_args, self.port_num,)
)
self._proc.daemon = False
self._proc.start()
time.sleep(wait_time)
self.start_time = start_time or timestamp()
def terminate(self):
"""
If the server process is currently running, terminate it
"""
if self._proc.is_alive():
self._proc.terminate()
while self._proc.is_alive():
time.sleep(.1)
self._proc = None
self.start_time = None
class AnnotationTask:
"""
Stores the task definition and manages the multiprocessing.Process for an
annotation task. Also keeps a list of annotators working on the task.
"""
# A finite list of available port numbers; the Helm chart assign explicit
# addresses to each of them, so they cannot be random
_reserved_ports: List[int] = [port_num for port_num in range(9091, 9101)]
# For use as a context manager so only one thread can obtain/return a port
# number at any one time
_lock = Lock()
@classmethod
def _claim_port(cls, port_num: int = None) -> int:
"""
Claim a port by number, or get the next available one; raises an error
if we've run out of port numbers
:param port_num: the port number to assign to the server; if not given,
select the next available one from the list of reserved port numbers
:return: an integer value between 9091 and 9100 (inclusive)
:raises: ValueError if no port numbers are available
"""
with cls._lock:
if cls._reserved_ports:
if port_num is not None:
cls._reserved_ports.remove(port_num)
else:
port_num = cls._reserved_ports.pop(0)
return port_num
raise ValueError("All reserved port numbers are taken")
@classmethod
def available_ports(cls) -> List[int]:
return cls._reserved_ports
def __init__(self, taskdef: TaskDefinition, port_num: int = None):
"""
Start a new Prodigy server with the task described in the definition
:param taskdef: an object with all the attributes needed to start the
process
:param port_num: the port number for the task; if not specified, select
the next available one
"""
port_num = AnnotationTask._claim_port(port_num=port_num)
self._task_def = taskdef
self._annotators = set()
self._server = ProdigyServer(port_num)
def start(self, start_time: datetime = None, wait_time: int = 10):
"""
:param start_time: date and time the task first started; will not be
None if the task was recreated from the active_tasks table at startup
:param wait_time: the number of seconds to wait to give the server a
chance to initialize properly (default: 10)
"""
self._server.start(
self._task_def, start_time=start_time, wait_time=wait_time
)
@property
def url(self) -> str:
"""
Return the externally accessible address for the server handling the
current task
:return: a URL (string)
"""
return self._server.url
def is_running(self):
return self._server.is_running()
def add_annotator(self, annotator_name: str) -> None:
self._annotators.add(annotator_name)
def terminate(self):
"""
Terminate the Prodigy server process; as a side effect, the port number
assigned to the server is now available for another task
"""
with self._lock:
self._reserved_ports.append(self._server.port_num)
self._server.terminate()
def summary(self) -> Dict:
"""
Return useful information about the task
"""
return {
"task": self._task_def,
"url": self._server.url,
"annotators": list(self._annotators),
"started_at": self._server.start_time
}
def run_server(command_args: str, port: int):
prodigy.serve(command_args, port=port, host="0.0.0.0")
This is a detail, but I noticed you're using app.on_event("startup")
. While I think this should fine, it is something that the FastAPI docs no longer recommend. Figured I'd at least mention it.
That said. My initial gut feeling is to add some more logging. I'm not seeing anything obviously strange in this code, so more telemetry might give us a next hint. Especially because we don't have a locally reproducible case yet. The prodigy.log
function can be used for this (as described here. Is this what you're using under the hood in from annotator.utils import log
?
In particular, it might be good to log the port number in ProdigyServer.start
, just to be able to confirm which ports actually end up getting used. It'd be especially useful if some user information was logged as well. I also think it cannot hurt to log the command_args
that have been generated.
One concern that comes to mind is related to the load_tasks
endpoint. I see that it's defined as an async function, but is blob.download_as_string
not a blocking function? I could be wrong, but I recall many cloud providers having Python APIs that are not async. And I can certainly imagine a big file download to block tasks. Might be good to log the start/end times of that function if large files are being pulled in.
Another detail if you're worried about the code exiting and breaking, you can also the Pythons native atexit
module to log things just before it fully quits. Demo:
import atexit
a = 1
b = 2
atexit.register(print, "This is a log printed at exit", locals())
In your code I would replace the print
with your preferred logger but this will also allow you to log any relevant locals on exit. As always, be mindful that you may be printing secrets too if you're not careful and you probably don't want these to appear in your logs.
Finally, if this doesn't yield anything useful, might it be possible to run an experiment on dummy data. Something like a JSONL file that really just has text like "this is example #{n}" just so we might lure out the issue? I'm certainly willing to jump on a call to dive into logs some more, but it would help very much if we could see the error happening live. That way, we might get to something that is reproducible.
Then again, we have refactored a large chunk of the codebase that handles task routing in v1.12, so the solution might also be to wait for that version.
Thanks for investigating and providing feedback. Here is more information:
utils.log()
method is simply a print statement with a timestamp that we use during initialization of the app, when the default logger is not yet available.blob.download_as_string()
function is certainly blocking, but that is not a problem inside an async function.2023-04-19T19:23:27.275855280Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19T19:23:27] [INFO] CREATING new server for 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.275911207Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19T19:23:27] [INFO] ADDING 'cheyanne' to 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.275979916Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] ADDING 'cheyanne','eup_low_scoring_data_v1' to state table
2023-04-19T19:23:27.478630563Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] DOWNLOADED [GCS] 'raw/3a/9533791731a61a22a40493ff1f9072' to 'data/9533791731a61a22a40493ff1f9072.jsonl'
2023-04-19T19:23:27.478691658Z [resource.labels.containerName: nsgi-annotation-service] [2023-04-19 19:23:27 +0000] [1] [INFO] CREATING task with '['eup-corpus-validation-with-options', 'eup_low_scoring_data_v1', 'data/9533791731a61a22a40493ff1f9072.jsonl']
2023-04-19T19:23:27.492342154Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: RECIPE: Calling recipe 'eup-corpus-validation-with-options'
2023-04-19T19:23:27.547289563Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONFIG: Using config from PRODIGY_CONFIG env var
2023-04-19T19:23:27.548088398Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Validating components returned by recipe
2023-04-19T19:23:27.549925541Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONTROLLER: Initialising from recipe
2023-04-19T19:23:27.550221218Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Creating validator for view ID 'blocks'
2023-04-19T19:23:27.550642601Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: VALIDATE: Validating Prodigy and recipe config
2023-04-19T19:23:27.552024010Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CONFIG: Using config from PRODIGY_CONFIG env var
2023-04-19T19:23:27.553338541Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Initializing database PostgreSQL
2023-04-19T19:23:27.554768238Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:27 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:27.582554270Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Connecting to database PostgreSQL
2023-04-19T19:23:27.588312571Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Creating dataset 'eup_low_scoring_data_v1'
2023-04-19T19:23:27.593729469Z [resource.labels.containerName: nsgi-annotation-service] Added dataset eup_low_scoring_data_v1 to database PostgreSQL.
2023-04-19T19:23:27.599291256Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: DB: Creating dataset '2023-04-19_19-23-27'
2023-04-19T19:23:27.633258290Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: FEED: Initializing from controller
2023-04-19T19:23:27.634140523Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:27[0m: CORS: initialized with wildcard "*" CORS origins
2023-04-19T19:23:27.634800494Z [resource.labels.containerName: nsgi-annotation-service] {}
2023-04-19T19:23:27.634852116Z [resource.labels.containerName: nsgi-annotation-service] ✨ Starting the web server at http://0.0.0.0:9091 ...
2023-04-19T19:23:27.634863474Z [resource.labels.containerName: nsgi-annotation-service] Open the app in your browser and start annotating!
2023-04-19T19:23:27.634869634Z [resource.labels.containerName: nsgi-annotation-service] {}
2023-04-19T19:23:27.647545281Z [resource.labels.containerName: nsgi-annotation-service] INFO: Started server process [163]
2023-04-19T19:23:27.647593665Z [resource.labels.containerName: nsgi-annotation-service] INFO: Waiting for application startup.
2023-04-19T19:23:27.648017755Z [resource.labels.containerName: nsgi-annotation-service] INFO: Application startup complete.
2023-04-19T19:23:27.648582794Z [resource.labels.containerName: nsgi-annotation-service] INFO: Uvicorn running on http://0.0.0.0:9091 (Press CTRL+C to quit)
[...]
2023-04-19T19:23:46.890964092Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: POST: /get_session_questions
2023-04-19T19:23:46.891658628Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:46.913629979Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: CONTROLLER: Getting batch of questions for session: eup_low_scoring_data_v1-cheyanne
2023-04-19T19:23:46.914257752Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: Finding next batch of questions in stream
2023-04-19T19:23:46.920680018Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: re-adding open tasks to stream
2023-04-19T19:23:46.922544217Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: FEED: batch of questions requested for session eup_low_scoring_data_v1-cheyanne: 10
2023-04-19T19:23:46.923044861Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m19:23:46[0m: RESPONSE: /get_session_questions (10 examples)
2023-04-19T19:23:46.923221252Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 Client closed local connection on 127.0.0.1:5432
2023-04-19T19:23:46.927698778Z [resource.labels.containerName: nsgi-annotation-service] INFO: 10.56.1.24:53074 - "POST /get_session_questions HTTP/1.1" 200 OK
The two entries of concern are possibly those coming from the cloudsql-proxy:
2023-04-19T19:23:46.891658628Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T19:23:46.923221252Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 19:23:46 Client closed local connection on 127.0.0.1:5432
We assume that the "Client" in the entry above is the prodigy web server; our app is not opening or closing connections, or using a connection pool. We see these entries on every interaction between the prodigy server handling an annotation task and the database: getting a fresh batch of data for a task, or recording (saving) an answer to a question to the database, as in this example:
2023-04-19T22:37:59.182586922Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:37:59[0m: CONTROLLER: Added 1 answers to dataset 'stt_error_validation' in database PostgreSQL
2023-04-19T22:37:59.183282062Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:37:59[0m: RESPONSE: /give_answers
2023-04-19T22:37:59.183473098Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:37:59 Client closed local connection on 127.0.0.1:5432
2023-04-19T22:37:59.184950692Z [resource.labels.containerName: nsgi-annotation-service] INFO: 10.56.5.11:33424 - "POST /give_answers HTTP/1.1" 200 OK
2023-04-19T22:38:37.377031033Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:38:37[0m: POST: /give_answers (received 1, session ID 'stt_error_validation-joel')
2023-04-19T22:38:37.377516031Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:38:37 New connection for "ml-dev-307721:us-east1:prodigy-annotation-db"
2023-04-19T22:38:37.397329881Z [resource.labels.containerName: nsgi-annotation-service] [1;38;5;135m22:38:37[0m: CONTROLLER: Receiving 1 answers
2023-04-19T22:38:37.397445116Z [resource.labels.containerName: cloudsql-proxy] 2023/04/19 22:38:37 Client closed local connection on 127.0.0.1:5432
Are you sure about the blocking there?
Here's a demo app, in a file called app.py
that I just ran locally.
import time
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.get("/sleep_slow")
async def sleep_slow():
time.sleep(5)
return {"status": "done"}
@app.get("/sleep_fast")
async def sleep_fast():
await asyncio.sleep(1)
return {"status": "done"}
Note that one async function uses the blocking time.sleep
while the other uses the non-blocking asyncio.sleep
. You can now run this locally via;
uvicorn app:app
You can now point the browser to /sleep_slow
and in another tab to /sleep_fast
. Notice that while /sleep_slow
is still waiting ... /sleep_fast
doesn't return. It's because time.sleep
is blocking, despite being in a async
function.
Is this not a concern for the blob.download_as_string
?
Thanks for investigating! Some feedback from our team. Please let me know if you have any questions!
blob.download_as_string()
calls is a blocking call, but eventually each call finishes (typically in less than a second) and when they are all finished, the load_tasks()
method itself finishes and returns.sleep_fast
and sleep_slow
endpoints simultaneously, sleep_fast
will return after sleep_slow
, even though it was asked to sleep 4 fewer seconds than sleep_slow
, because the call to time.sleep()
in sleep_slow
blocks sleep_fast
from returning.load_tasks()
finishes, nothing else can happen. However, because we call load_tasks()
just once at initialization time, it does not interfere with anything else, because nothing else is happening -- nothing else can happen until initialization is complete.async
, is by design a sequence of blocking calls -- such as obtaining the database password in step 1 and loading the prodigy.json
file in step 3 -- that must succeed in order for us to have a functioning service.load_tasks()
returns in step 5, we have all our task definitions in memory.multiprocessing.Process
in step 10./api/v1/tasks
endpoint to select a new task, which will run its own prodigy webserver in a separate process associated with its own port number.max_connections
setting is 100, so it's a bit of a mystery as to why that would happen.I have one additional update: we have a scenario where two annotators are annotating the same set, and they each got the save error on the same document while annotating days apart. This has happened before, so it's not new, but I cannot identify anything about the documents that this error occurs on that is unique. Also, pressing ignore
or reject
doesn't work to move past the document with the error. The annotator will successfully save a document, and the document with the error will pop up again.
I've noticed one trend in the content. Tasks where we have a link to full conversations do not encounter this error, but single utterances do, and they seem to contain sql protected keywords: union, select, having, etc.
Do you have an example json
representation of such a task?
Also, can you confirm that this never happens when a SQL protected keyword is not around?
I cannot definitively confirm that this has never happened when a SQL protected keyword is absent, but I collected sample utterances that have caused errors and they all contain SQL protected keywords. I'm currently testing this dataset with our annotators and will post the outcome. I was still not able to reproduce the error locally.
Just a quick note that we resolved this issue! It was a WAF issue where documents containing SQL protected keywords caused save errors and would not save to the database.
Good to know! Thanks for reporting back!