Prodigy loads all the stream before annotation starts

I have built my own custom Elasticsearch loader and custom recipe wrapper on top of ner.make_gold. Both of them return a stream as a generator. However when I start the annotation I see that Prodigy iterate over stream generator until it gets exhausted before presenting any samples for annotation.

Here is my wrapper code:

@prodigy.recipe('elastic.ner.make-gold',
            dataset=recipe_args['dataset'],
            spacy_model=recipe_args['spacy_model'],
            source=recipe_args['source'],
            label=recipe_args['label_set'],
            exclude=recipe_args['exclude'],
            unsegmented=recipe_args['unsegmented'],
            anonymize=('anonymize content of the samples', 'flag', 'a'))
def elastic_make_gold(dataset, spacy_model, source=None, api=None, loader=None,
                  label=None, exclude=None, unsegmented=False, anonymize=False):

    stream = prodigy.get_stream(source, loader='elastic_loader')
    stream = transform_stream(stream, spacy_model, anonymize)
    components = make_gold(dataset=dataset, spacy_model=spacy_model,
                           source=stream, label=label, exclude=exclude, unsegmented=unsegmented)

    print('Components:', components)
    return components

Here is the full log:

prodigy elastic.ner.make-gold ner-test en_core_web_sm --label ORG,PRODUCT --exclude ner-test --unsegmented
10:06:41 - No API key or APP key was provided for Datadog
10:06:41 - CLI: Added 2 recipe(s) via entry points
10:06:41 - RECIPE: Calling recipe 'elastic.ner.make-gold'
Using 2 labels: ORG, PRODUCT
10:06:41 - LOADER: Added 1 file loader(s) via entry points
10:06:41 - LOADER: Loading stream from elastic_loader
10:06:41 - LOADER: Reading stream from sys.stdin
10:06:41 - RECIPE: Starting recipe ner.make-gold
10:06:41 - RECIPE: Loaded model en_core_web_sm
10:06:41 - RECIPE: Annotating with 2 labels
10:06:41 - LOADER: Using supplied iterable source as stream
10:06:41 - LOADER: Rehashing stream
Components: {'view_id': 'ner_manual', 'dataset': 'ner-test', 'stream': <generator object make_gold.<locals>.make_tasks at 0x7f96dc3cd5c8>, 'exclude': ['ner-test'], 'update': None, 'config': {'lang': 'en', 'labels': ['ORG', 'PRODUCT']}}
10:06:41 - CONTROLLER: Initialising from recipe
10:06:41 - VALIDATE: Creating validator for view ID 'ner_manual'
10:06:41 - DB: Initialising database PostgreSQL
10:06:44 - DB: Connecting to database PostgreSQL
10:06:51 - DB: Loading dataset 'ner-test' (102 examples)
10:06:51 - DB: Creating dataset '2019-03-26_10-06-41'
10:06:52 - DatasetFilter: Getting hashes for excluded examples
10:06:52 - DatasetFilter: Excluding 98 tasks from datasets: ner-test
10:06:52 - CONTROLLER: Initialising from recipe

  ✨  Starting the web server at http://0.0.0.0:8080 ...
  Open the app in your browser and start annotating!

10:06:57 - GET: /project
10:06:58 - GET: /get_questions
10:06:58 - Task queue depth is 1
10:06:58 - Task queue depth is 2
10:06:58 - FEED: Finding next batch of questions in stream
10:06:58 - CONTROLLER: Validating the first batch for session: None
10:06:58 - PREPROCESS: Tokenizing examples
10:06:58 - FILTER: Filtering duplicates from stream
10:06:58 - FILTER: Filtering out empty examples for key 'text'
10:06:59 - GET https://xxxx.es.amazonaws.com:443/emails/_count [status:200 request:0.791s]
10:07:00 - GET https://xxxx.es.amazonaws.com:443/emails/_search?scroll=480m&size=3 [status:200 request:0.485s]
10:07:00 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.222s]
10:07:00 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.177s]
10:07:01 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.225s]
10:07:01 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.158s]
10:07:01 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.192s]
10:07:02 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.359s]
10:07:02 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.282s]
10:07:03 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.369s]
10:07:03 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.567s]
10:07:04 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.290s]
10:07:04 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.350s]
10:07:05 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.455s]
10:07:05 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.202s]
10:07:05 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.326s]
10:07:06 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.282s]
10:07:06 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.155s]
10:07:06 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.492s]
10:07:07 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.184s]
10:07:07 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.303s]
10:07:07 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.201s]
10:07:08 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.232s]
10:07:08 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.322s]
10:07:08 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.159s]
10:07:09 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.179s]
10:07:09 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.171s]
10:07:09 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.184s]
10:07:09 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.152s]
10:07:10 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.170s]
10:07:10 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.156s]
10:07:10 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.164s]
10:07:10 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.287s]
10:07:11 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.184s]
10:07:11 - GET https://xxxx.es.amazonaws.com:443/_search/scroll?scroll=480m [status:200 request:0.153s]
10:07:11 - DELETE https://xxxx.es.amazonaws.com:443/_search/scroll [status:200 request:0.205s]
10:07:17 - RESPONSE: /get_questions (3 examples)

I would like to be able to annotate potentially infinite stream of documents. Is there any workaround? How to fix this behavior?

Hi! A few things that aren’t very clear from your code:

What’s the source you’re passing in? It looks like that’s always None, so why are you calling Prodigy’s get_stream? Did you actually register elastic_loader via an entry point? And what does transform_stream do?

Thanks for quick response. I copy/paste only part of the command here is a correct one:

tail -n +2 ids/ids.csv | prodigy elastic.ner.make-gold ner-test en_core_web_sm --label ORG,PRODUCT --exclude ner-test --unsegmented

Did you actually register elastic_loader via an entry point?

Yes. Here is my setup.py, I'm using "pip install -e ." to register it:

from setuptools import setup

setup(
    name='prodigy_utils',
    entry_points={
        'prodigy_recipes': [
            'elastic_teach = recipes:elastic_textcat_teach',
            'elastic_mark = recipes:elastic_mark'
        ],
        'prodigy_loaders': [
            'elastic_loader = loaders:elastic_api_loader'
        ],
    },
    requirements=[
        'prodigy>=1.5.0'
    ]
)

And what does transform_stream do?

It also returns an Iterable (BTW anonymize_doc is False in my test, please ignore it):

def transform_stream(stream: Iterable[Dict[str, Any]], spacy_model, anonymize_doc: bool) -> Iterable[Dict[str, Any]]:
    if anonymize_doc:
        nlp = spacy.load(spacy_model)
        anonymizer = Anonymizer(nlp, randomize=True)
        anonymize = partial(update_in, keys=['text'], func=anonymizer.anonymize)

    stream = map(process_document, stream)
    stream = filter(lambda x: 'skipme' not in x, stream)
    # stream = split_sentences(nlp, stream, min_length=config.get('split_sents_threshold', 0))
    stream = map(anonymize, stream) if anonymize_doc else stream
    stream = map(lambda x: dissoc(x, 'html'), stream)

    return stream

Thanks for providing more details – looks good! I was just looking at the log again and noticed this:

10:07:17 - RESPONSE: /get_questions (3 examples)

What batch size do you have configured in your recipe? Internally, Prodigy will partition the stream into batches, so it’ll be consumed until one full batch is available (default batch size is 10). So considering the response only returned 3 examples, is it possible that for whatever reason, your search query didn’t yield enough examples?

What batch size do you have configured in your recipe?

This behavior is expected. My batch_size in prodigy.json is 3. Also I'm sure all documents are in place, because in the command line above I'm using pre-sampled list of document ids from the stdin (the actual query).

Here is also related loader code:

SCROLL_TIMEOUT='480m'
SCROLL_SIZE=3

def elastic_api_loader(source: Union[str,TextIOWrapper]) -> Iterable[dict]:
    es_config = get_config()['elastic_api']
    fields = es_config.get('fields_to_return', DEFAULT_FIELDS)
    text_field = es_config.get('text_field', DEFAULT_TEXT_FIELD)
    index = es_config.get('index')

    handler = ElasticQueryHandler(index)

    def run_query_by_id(ids: List[str]) -> Iterable[Dict[str, Any]]:
        search = handler.query_by_id(ids).source(fields)
        return process_response(search)

    def process_response(search: Search, keyword: Optional[str] = None) -> Iterable[Dict[str, Any]]:
        count = search.count()
        search = search.params(scroll=SCROLL_TIMEOUT, size=SCROLL_SIZE)

        # Stream documents using scroll API for pagination
        # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
        for hit in search.scan():
           meta = {'index': hit.meta.index, 'es_score': hit.meta.score, 'doc_count': count, 'query': keyword}
           task = hit.to_dict()
           task['text'] = task.get(text_field)
           task['meta'] = valfilter(lambda v: v is not None, meta)

           yield task

    if isinstance(source, TextIOWrapper):
        queries = map(lambda l: l.strip(), source.readlines())
        stream = run_query_by_id(list(queries))
    else:
        stream = run_query_by_id([source])

    return stream

Yeah, this really all looks reasonable :thinking: Could you test one thing for me and add "overlap": False to the "config" returned by the recipe?

(Internally, this will use a different method of putting together the stream for the annotation session and assume the stream will be shared by multiple sessions. If this doesn’t cause the same behaviour of the generator being consumed, it may indicate a problematic interaction within the feed in Prodigy. If nothing changes, the behaviour is likely not related to how Prodigy puts together the stream.)

My components is now look like this:

Components: {'view_id': 'ner_manual', 'dataset': 'ner-test', 'stream': <generator object make_gold.<locals>.make_tasks at 0x7f76ccf50570>, 'exclude': ['ner-test'], 'update': None, 'config': {'lang': 'en', 'labels': ['ORG', 'PRODUCT'], 'overlap': False}}

However, it does not seems to have any effect. I see the same number of queries to the Elasticsearch cluster in the logs…
BTW, I’m using prodigy 1.6.1.