I am writing a custom ner.make-gold
recipe. I am replacing the stream = split_sentences(stream)
call with my own segment_documents()
function that passes along some information about the original document when splitting it into sentences. The code is below.
I think I’m copying the logic for these stream functions correctly. In particular I’m doing my best to imitate the iteration logic in make_tasks()
. But I get a “generator already executing” error when I try to enumerate the stream inside segment_documents()
. What am I doing wrong here?
@recipe('ner.make-gold',
dataset=recipe_args['dataset'],
spacy_model=recipe_args['spacy_model'],
source=recipe_args['source'],
api=recipe_args['api'],
loader=recipe_args['loader'],
id_path=("record path to document ID", "option", None, str),
patterns=recipe_args['patterns'],
labels=recipe_args['label_set'],
exclude=recipe_args['exclude'],
unsegmented=recipe_args['unsegmented'])
def make_gold(dataset, spacy_model, source=None, api=None, loader=None,
id_path="_id",
patterns=None, labels=None, exclude=None, unsegmented=False):
"""
Create gold data for NER by correcting a model's suggestions.
"""
def segment_documents():
if unsegmented:
for eg in stream:
eg["document"]["segment"] = 1
eg["document"]["start_char"] = 0
yield eg
else:
for document, eg in nlp.pipe(((eg["text"], eg) for eg in stream), as_tuples=True):
eg["document"] = {}
eg["document"]["id"] = path_value_from_dict(eg, id_path)
for i, sentence in enumerate(document.sents, 1):
segment_eg = copy.deepcopy(eg)
segment_eg["text"] = sentence.text_with_ws
segment_eg["document"]["segment"] = i
segment_eg["document"]["start_char"] = sentence.start_char
yield segment_eg
def make_tasks():
"""Add a 'spans' key to each example, with predicted entities."""
texts = ((eg['text'], eg) for eg in stream)
for doc, eg in nlp.pipe(texts, as_tuples=True):
task = copy.deepcopy(eg)
pattern_matches = tuple(Span(doc, start, end, label) for label, start, end in matcher(doc))
spans = disjoint_spans(span for span in doc.ents + pattern_matches if span.label_ in labels)
# spans = disjoint_spans(spans)
task["spans"] = [
{
'token_start': span.start,
'token_end': span.end - 1,
'start': span.start_char,
'end': span.end_char,
'text': span.text,
'label': span.label_,
'source': spacy_model,
'input_hash': eg[INPUT_HASH_ATTR]
}
for span in spans
]
yield set_hashes(task)
log("RECIPE: Starting recipe ner.make-gold", locals())
nlp = spacy.load(spacy_model)
log("RECIPE: Loaded model {}".format(spacy_model))
patterns_by_label = {}
for entry in read_jsonl(patterns):
patterns_by_label.setdefault(entry['label'], []).append(entry['pattern'])
matcher = Matcher(nlp.vocab)
for pattern_label, patterns in patterns_by_label.items():
matcher.add(pattern_label, None, *patterns)
# Get the label set from the `label` argument, which is either a
# comma-separated list or a path to a text file. If labels is None, check
# if labels are present in the model.
if labels is None:
labels = set(get_labels_from_ner(nlp) + patterns_by_label.keys())
print("Using {} labels from model: {}".format(len(labels), ', '.join(labels)))
log("RECIPE: Annotating with {} labels".format(len(labels)), labels)
stream = get_stream(source, api=api, loader=loader, rehash=True, dedup=True, input_key='text')
# Optionally split the stream into segments, keeping track of original document information.
stream = segment_documents()
# Tokenize the stream
stream = add_tokens(nlp, stream)
return {
'view_id': 'ner_manual',
'dataset': dataset,
'stream': make_tasks(),
'exclude': exclude,
'update': None,
'config': {'lang': nlp.lang, 'labels': labels}
}
All the Googling I do about this error message turns up references to generators not being thread-safe. Is split_sentences(nlp, stream)
adding some thread safety?