"generator already executing" when operating on a stream

I am writing a custom ner.make-gold recipe. I am replacing the stream = split_sentences(stream) call with my own segment_documents() function that passes along some information about the original document when splitting it into sentences. The code is below.

I think I’m copying the logic for these stream functions correctly. In particular I’m doing my best to imitate the iteration logic in make_tasks(). But I get a “generator already executing” error when I try to enumerate the stream inside segment_documents(). What am I doing wrong here?

@recipe('ner.make-gold',
        dataset=recipe_args['dataset'],
        spacy_model=recipe_args['spacy_model'],
        source=recipe_args['source'],
        api=recipe_args['api'],
        loader=recipe_args['loader'],
        id_path=("record path to document ID", "option", None, str),
        patterns=recipe_args['patterns'],
        labels=recipe_args['label_set'],
        exclude=recipe_args['exclude'],
        unsegmented=recipe_args['unsegmented'])
def make_gold(dataset, spacy_model, source=None, api=None, loader=None,
              id_path="_id",
              patterns=None, labels=None, exclude=None, unsegmented=False):
    """
    Create gold data for NER by correcting a model's suggestions.
    """

    def segment_documents():
        if unsegmented:
            for eg in stream:
                eg["document"]["segment"] = 1
                eg["document"]["start_char"] = 0
                yield eg
        else:
            for document, eg in nlp.pipe(((eg["text"], eg) for eg in stream), as_tuples=True):
                eg["document"] = {}
                eg["document"]["id"] = path_value_from_dict(eg, id_path)
                for i, sentence in enumerate(document.sents, 1):
                    segment_eg = copy.deepcopy(eg)
                    segment_eg["text"] = sentence.text_with_ws
                    segment_eg["document"]["segment"] = i
                    segment_eg["document"]["start_char"] = sentence.start_char
                    yield segment_eg

    def make_tasks():
        """Add a 'spans' key to each example, with predicted entities."""
        texts = ((eg['text'], eg) for eg in stream)
        for doc, eg in nlp.pipe(texts, as_tuples=True):
            task = copy.deepcopy(eg)
            pattern_matches = tuple(Span(doc, start, end, label) for label, start, end in matcher(doc))
            spans = disjoint_spans(span for span in doc.ents + pattern_matches if span.label_ in labels)
            # spans = disjoint_spans(spans)
            task["spans"] = [
                {
                    'token_start': span.start,
                    'token_end': span.end - 1,
                    'start': span.start_char,
                    'end': span.end_char,
                    'text': span.text,
                    'label': span.label_,
                    'source': spacy_model,
                    'input_hash': eg[INPUT_HASH_ATTR]
                }
                for span in spans
            ]
            yield set_hashes(task)

    log("RECIPE: Starting recipe ner.make-gold", locals())
    nlp = spacy.load(spacy_model)
    log("RECIPE: Loaded model {}".format(spacy_model))

    patterns_by_label = {}
    for entry in read_jsonl(patterns):
        patterns_by_label.setdefault(entry['label'], []).append(entry['pattern'])
    matcher = Matcher(nlp.vocab)
    for pattern_label, patterns in patterns_by_label.items():
        matcher.add(pattern_label, None, *patterns)

    # Get the label set from the `label` argument, which is either a
    # comma-separated list or a path to a text file. If labels is None, check
    # if labels are present in the model.
    if labels is None:
        labels = set(get_labels_from_ner(nlp) + patterns_by_label.keys())
        print("Using {} labels from model: {}".format(len(labels), ', '.join(labels)))
    log("RECIPE: Annotating with {} labels".format(len(labels)), labels)
    stream = get_stream(source, api=api, loader=loader, rehash=True, dedup=True, input_key='text')
    # Optionally split the stream into segments, keeping track of original document information.
    stream = segment_documents()
    # Tokenize the stream
    stream = add_tokens(nlp, stream)

    return {
        'view_id': 'ner_manual',
        'dataset': dataset,
        'stream': make_tasks(),
        'exclude': exclude,
        'update': None,
        'config': {'lang': nlp.lang, 'labels': labels}
    }

All the Googling I do about this error message turns up references to generators not being thread-safe. Is split_sentences(nlp, stream) adding some thread safety?

Hmm there shouldn’t be any threading issues. Waitress by default would launch multiple threads, but we disable that in the app.py. You haven’t changed anything like that have you? Also I don’t suppose you’re testing this with pytest, which could be doing something sneaky?

We did have that error at various points. It’s very frustrating — let me dig into this a bit and see if I can figure it out.

Edit: This is likely beside the point, but try simplifying your logic a bit? Break the segment_documents() function into two, for each branch of the conditional – and then move the conditional out to where you call the function. Then also move the generator you have inside the call to nlp.pipe() out of the generator function. This should give you a better chance to localise the error.

Remember that if you’re not sure which stage of the generator is messing you up, you can always call list() on it to exhaust it. This makes it easier to see where the problem is.

Finally, I think they made some fixes to this in Python 3.6. So as a long shot, if you’re on 3.5, maybe that matters? Try the other things first though, they’re more likely.

Running Python 3.6. I haven’t futzed with multithreading on my end. All I’m doing to run this recipe is passing it as an argument to prodigy -F. I also debug by running the custom recipe directly in PyCharm and see the same effects.

Breaking up segments_documents() may be a good idea, but I stepped through both paths in the debugger and saw the same error for both of the attempts to iterate over stream.

Calling list(stream) in the debugger at the top of segments_documents() gives the same “generator already executing” error.

I was just looking at a code, and it might just be a typo you’ve already corrected – but both segment_documents() and make_tasks() don’t actually take the stream as an argument. So if your code still looks like this, that might be the problem. Your filter functions aren’t actually wrapping the generator, they’re executing it.

So ideally, your stream composition should look something like this:

stream = get_stream(source, api=api, loader=loader, rehash=True, dedup=True, input_key='text')
stream = segment_documents(stream)
stream = add_tokens(nlp, stream)
stream = make_tasks(stream)

That’s it!!!

Wow that was subtle. My convention when nesting functions inside other functions is to not pass in-scope variables as parameters. I’ve never found this to make a difference: in Python you generally don’t have to be mindful of a pass-by-reference/pass-by-value distinction. However, when dealing with an in-scope generator it apparently does make a difference.

This code works now, and I learned something new about Python.

1 Like