Batch size ignored for custom loader?

I am having an issue where Prodigy pulls all the data from my data source before starting and not just a small batch. Please let me figure out what's going on.

I have written a custom function that loads data from a SQL Server database and does some custom segmentation

def db_loader(model_id, server_pa=<SERVER>, database_pa=<DB>, driver=<DRIVER>, batch_size=100, db_cursor_size=100, config_file='config.txt', source_id=None):
    segmentation_loaded = None
    predana_con = loader_utilities.make_sql_connection(server_pa, database_pa, driver, config_file=config_file)
    source = loader_utilities.get_source_data(predana_con, model_id, source_id)
    
    for ix, source_row in source.iterrows():
        txt_src_con = loader_utilities.make_sql_connection(source_row.source_server,source_row.source_database, driver, config_file=config_file) 
       # unlabeled_record_metadata = loader_utilities.get_unlabeled_record_metadata(predana_con, model_id, source_row.source_id, BATCH_SIZE)
        
        con = pyodbc.connect(predana_con)
        cursor=con.cursor()
        cursor.execute(f'EXECUTE dbo.spcGetModelData {model_id}, {source_row.source_id}, {batch_size}')
        for row in loader_utilities.get_db_chunks(cursor, db_cursor_size):
       # for row in unlabeled_record_metadata:
            #log record as pending
            loader_utilities.log_record_status(predana_con, model_id, source_row.source_id, row[0], 1)
            #load segmentation function (only load if new or changed)
            segmentation_scheme = loader_utilities.get_segmentation_scheme(predana_con, row[0])
            if segmentation_scheme!=segmentation_loaded:
                segmentation_function = loader_utilities.load_segmentation_function(segmentation_scheme)
                segmentation_loaded=segmentation_scheme
                
            #get query and run on text server                    
            text_qry = loader_utilities.get_text_query(predana_con, row[0])
            text = loader_utilities.get_text_to_label(txt_src_con, text_qry)
            
            #iterate over text segments
            for i, txt in enumerate(segmentation_function(text)): 
                task = {'meta':{'model_id':model_id, 'source_id':source_row.source_id, 'model_data_id':row[0], 'segment_id': i}, 'text': txt}
                yield(task)
        con.close()

This works as a generator. I get use next() and get one records at a time, etc. However, no matter how I try to use it in Prodigy, it runs through the entire stream of available data. I have tried setting the batch_size in prodigy.json to small numbers (5, 10), but it will alway stream all the available data. So for large batches of data, there is a very long startup time for preparing annotation. I was expecting my code to pull a batch of SQL and for Prodigy to pull from that until it runs out and my process queries another batch.

I have written a custom recipe to use this data that looks like

@recipe(
    'ner.correct_custom',
    dataset=("Dataset to save annotations to", "positional", None, str),
    spacy_model=("Loadable spaCy model with an entity recognizer", "positional", None, str),
    label=("Comma-separated label(s) to annotate or text file with one label per line", "option", "l", get_labels),
    server=("Data management server ip", "option", "s", str),
    database=("Data management database", "option", "db", str),
    driver=("ODBC driver name", "option", "d", str),
    model_id=("ID of model to label records for", "option", "mdl", int),
    sql_batch_size=("Number of records to stage for labeling", "option", "b", int),
    db_cursor_size=("Number of records to pull from source at a time", "option", "cur", int),
    config_file=("Location of config file", "option", "cnf", str),
)
def correct_custom(
    dataset: str,
    spacy_model: str,
    label: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
    server: str = <SERVER>,
    database: str = <DATABASE>,
    driver: str = <DRIVER>,
    model_id: int = 1,
    sql_batch_size: int = 10, 
    db_cursor_size: int = 5,
    config_file: str = 'config.txt',
):
    print("Loading SpaCy Model")
    nlp = spacy.load(spacy_model, disable=['parser', 'tagger', 'textcat'])
    print("Finished Loading SpaCy Model")
    loader = db_loader(model_id=model_id, server_pa=server, database_pa=database, driver=driver, 
                       batch_size=sql_batch_size, db_cursor_size=db_cursor_size, config_file=config_file)
    stream = get_stream(loader, rehash=True, dedup=False, input_key="text")
    stream = add_tokens(nlp, stream)
    stream = make_tasks(nlp, stream, label)

    return {
        "view_id": "ner_manual",  # Annotation interface to use
        "dataset": dataset,  # Name of dataset to save annotations
        "stream": stream,  # Incoming stream of examples
        "exclude": exclude,  # List of dataset names to exclude
        "config": {  # Additional config settings, mostly for app UI
            "lang": nlp.lang,
            "labels": label,  # Selectable label options
        },
    
    }

I am calling Prodigy via

> prodigy ner.correct_custom test_data <MODEL> -l <LABEL> -F <RECIPE FILE>

What am I missing? I have been struggling to figure this out for a few days.

Hi! The most likely explanation here is that something somewhere is consuming the whole generator :thinking: Did you try add some print statements in your recipe to check where it hangs, and is there anything in your stream wrapper functions that could be doing this? Prodigy expects streams to be potentially infinite, so once the "stream" is returned by the recipe, it will only ever process it in batches.

Thank you @ines ! I know you didn't exactly find the bug, but I think it nudged me in the right direction. I was shamelessly stealing the make_tasks code from the ner.py recipe. I think the first line there consumes the entire stream, correct?

    def make_tasks(nlp, stream: Iterable[dict]) -> Iterable[dict]:
        """Add a 'spans' key to each example, with predicted entities."""
this one --> texts = ((eg["text"], eg) for eg in stream)
        for doc, eg in nlp.pipe(texts, as_tuples=True):
            task = copy.deepcopy(eg)
            spans = []
            for ent in doc.ents:
                if labels and ent.label_ not in labels:
                    continue
                spans.append(
                    {
                        "token_start": ent.start,
                        "token_end": ent.end - 1,
                        "start": ent.start_char,
                        "end": ent.end_char,
                        "text": ent.text,
                        "label": ent.label_,
                        "source": spacy_model,
                        "input_hash": eg[INPUT_HASH_ATTR],
                    }
                )
            task["spans"] = spans
            task = set_hashes(task)
            yield task

I changed the to iterate through the stream as well. That seems to be doing what I want it to do. Does this make sense? I'm not the Python wiz I'd like to be.

def make_tasks(nlp, stream, labels):
    """Add a 'spans' key to each example, with predicted entities."""
    # Process the stream using spaCy's nlp.pipe, which yields doc objects.
    # If as_tuples=True is set, you can pass in (text, context) tuples.
    #texts = ((eg["text"], eg) for eg in stream)
    for eg in stream:
        texts = ((eg["text"], eg) )
        for doc, eg in nlp.pipe(texts, as_tuples=True):
            task = copy.deepcopy(eg)
            spans = []
            for ent in doc.ents:
                # Continue if predicted entity is not selected in labels
                if labels and ent.label_ not in labels:
                    continue
                # Create span dict for the predicted entity
                spans.append(
                    {
                        "token_start": ent.start,
                        "token_end": ent.end - 1,
                        "start": ent.start_char,
                        "end": ent.end_char,
                        "text": ent.text,
                        "label": ent.label_,
                    }
                )
            task["spans"] = spans
            # Rehash the newly created task so that hashes reflect added data
            task = set_hashes(task)
            yield task

Thanks again for your help. It helped me hit a tight deadline on getting this up and running.

Hi @ines, one more quick question please. Does the PatternMatcher always process the entire stream? If so, is there a way to turn this off? I am try to use the PatternMatcher to pre-tag in an ner.manual situation, but is seem to exhaust all the available data. If I wanted to iterate through my steam, how do I need to format the data?

I tried writing something like this, but I can't make it work. Obviously example is in the wrong format to be consumed by the matcher.

def apply_pattern(stream, nlp, patterns):
    print("Using patterns file")
    pattern_matcher = PatternMatcher(nlp, all_examples=True, combine_matches=True)
    pattern_matcher = pattern_matcher.from_disk(patterns)
    for example in stream:
        for  _, eg in pattern_matcher((example)):
            yield((eg))

I'm following this example - https://github.com/explosion/prodigy-recipes/blob/master/ner/ner_match.py and having similar issues.

First, the example calls matcher.from_disk(patterns) which crashes if patterns is None - but the typing of patterns is patterns: Optional[str] = None.

But then later on, (ex for score, ex in matcher(stream)) returns an empty stream, specifically when I debug by calling list(matcher(stream)) it returns [].

My understanding was matcher will update the tokens of any example which has matches - but I assumed it would yield every example - match or not?

Also the documentation is a little confusing, i.e. for PatternMatcher.call it states that the return is a bool yet the note says yields (score, example) tuples

@bburch Glad you got it working! The ((eg["text"], eg) for eg in stream) line should't consume the generator, though – the round brackets make it a generator comprehension. If it used a list comprehension instead, then it would definitely consume the whole stream.

The pattern matcher shouldn't be consuming the stream, though, so that's definitely confusing. I'll look into that! If you just want to match stuff in text, all you really need is a Matcher, so it might be easier to just use spaCy's matcher directly: https://spacy.io/usage/rule-based-matching

That's toggled by the all_examples flag on PatternMatcher.__init__. This lets you use the matcher for select examples for annotation and skip everything with no matches, or just add matches to examples if available and yield every single example.

The API here evolved a lot over time, so it's a lot less clean than I'd like it to be. The PatternMatcher is mostly a wrapper around spaCy's Matcher and PhraseMatcher with some extra logic to update its state with information on how often a pattern was accepted or rejected. If you're mostly looking to match patterns in your examples, you might actually prefer to just use spaCy's Matcher directly: https://spacy.io/usage/rule-based-matching

1 Like

You were right to be suspicious, and I was premature in my celebration. I thought I had made the change and tested the ner.correct_custom recipe, but I was testing my custom manual recipe without the pattern matcher instead, which works just fine (embarrassing!). So right now,make_tasks and PatternMatcher both eat my stream.

Could it be nlp.pipe that is doing it in mask_tasks? That's what I am going to look into next. Just incase isn't clear, I was using the mask_tasks as defined in ner.py in the prodigy package directory as is. I have made no modifications other than trying to troubleshoot the generator issue.

I will look into using Spacy's matcher and see if that fixes my issue. It sounds like it should. I just want to pre-tag entities with a patterns file.

Thanks!

EDIT: It does apprear to be nlp.pipe() that is eating the stream in make_tasks. If I instead process the records with nlp(), I get the behavior I want. One record is cached before connection to the server and then batch_size tasks are processed when connected.

BEFORE

    def make_tasks(nlp, stream: Iterable[dict]) -> Iterable[dict]:
        """Add a 'spans' key to each example, with predicted entities."""
        texts = ((eg["text"], eg) for eg in stream)
        for doc, eg in nlp.pipe(texts, as_tuples=True):
            task = copy.deepcopy(eg)
            spans = []
            for ent in doc.ents:
                if labels and ent.label_ not in labels:
                    continue
                spans.append(
                    {
                        "token_start": ent.start,
                        "token_end": ent.end - 1,
                        "start": ent.start_char,
                        "end": ent.end_char,
                        "text": ent.text,
                        "label": ent.label_,
                        "source": spacy_model,
                        "input_hash": eg[INPUT_HASH_ATTR],
                    }
                )
            task["spans"] = spans
            task = set_hashes(task)
            yield task

AFTER

def make_tasks(nlp, stream, labels):
    """Add a 'spans' key to each example, with predicted entities."""
    # Process the stream using spaCy's nlp.pipe, which yields doc objects.
    # If as_tuples=True is set, you can pass in (text, context) tuples.
    texts = ((eg["text"], eg) for eg in stream)
    for text, eg in texts:
        doc = nlp(text)
        task = copy.deepcopy(eg)
        spans = []
        for ent in doc.ents:
            # Continue if predicted entity is not selected in labels
            if labels and ent.label_ not in labels:
                continue
            # Create span dict for the predicted entitiy
            spans.append(
                {
                    "token_start": ent.start,
                    "token_end": ent.end - 1,
                    "start": ent.start_char,
                    "end": ent.end_char,
                    "text": ent.text,
                    "label": ent.label_,
                }
            )
        task["spans"] = spans
        # Rehash the newly created task so that hashes reflect added data
        task = set_hashes(task)
        yield task

Thanks for this! I was able to use the examples in the spaCy docs and put something together that does what I want and does not consume the stream. In case someone else is interested, here is what I did.

I wrote a little matcher function in my recipe called add_matches, which gets called as follows:

    stream = get_stream(loader, rehash=True, dedup=False, input_key="text")
    if patterns is not None:
        stream=add_matches(nlp, stream, patterns, labels)
    stream = add_tokens(nlp, stream, use_chars=highlight_chars)

Here is the code for the add_matches function:

def add_matches(nlp, stream, patterns, labels):
    #most comes from here https://spacy.io/usage/rule-based-matching#matcher and make_tasks
    def add_event_ent(matcher, doc, i, matches):
    # Get the current match and create tuple of entity label, start and end.
    # Append entity to the doc's entity. (Don't overwrite doc.ents!)
        match_id, start, end = matches[i]
        entity = Span(doc, start, end, label=nlp.vocab.strings[match_id])
        doc.ents += (entity,)

    matcher = Matcher(nlp.vocab)
    with open(patterns, 'r') as json_file:
        json_list = list(json_file)
    for line in json_list:
        line=json.loads(line)
        matcher.add(line['label'], add_event_ent, line['pattern'])

    texts = ((eg["text"], eg) for eg in stream)
    
    for text, eg in texts:
        doc = nlp(text)
        matcher(doc)
        task = copy.deepcopy(eg)
        spans = []
        for ent in doc.ents:
            # Continue if predicted entity is not selected in labels
            if labels and ent.label_ not in labels:
                continue
            # Create span dict for the predicted entitiy
            spans.append(
                {
                    "token_start": ent.start,
                    "token_end": ent.end - 1,
                    "start": ent.start_char,
                    "end": ent.end_char,
                    "text": ent.text,
                    "label": ent.label_,
                }
            )
        task["spans"] = spans
        # Rehash the newly created task so that hashes reflect added data
        task = set_hashes(task)
        yield task

I have my two stream eaters mitigated (and tested correctly this time!). Thanks again for the help. Much appreciated!

Thanks for sharing and glad you got it working!

One small suggestion to make things faster: instead of iterating over your text and eg and then processing each text, you can also use nlp.pipe with as_tuples=True to process the texts in batches:

texts = ((eg["text"], eg) for eg in stream)
for doc, eg in nlp.pipe(texts, as_tuples=True):
    ...

nlp.pipe is what was exhausting my stream in the phrase_match function.

@bburch I've been doing something similar - I adapted your code as it was a bit tidier than mine but I ran into one issue. The spacy Matcher can return duplicate overlapping matches. They provide a utility spacy.util.filter_spans to create a list of non-overlapping spans. So I ended up replacing the callback with this code:

    matches = matcher(doc)
    spans = list(doc.ents)
    for match_id, start, end in matches:
        span = spacy.tokens.Span(doc, start, end, label=nlp.vocab.strings[match_id])
        spans.append(span)
    doc.ents = spacy.util.filter_spans(spans)

You could possibly do this recursively in the callback - it may produce different results as fitler_spans prefers the longest non-overlapping spans, i.e.

def add_event_handler(matcher, doc, i, matches):
    match_id, start, end = matches[i]
    spans = list(doc.ents)
    span = spacy.tokens.Span(doc, start, end, label=nlp.vocab.strings[match_id])
    spans.append(span)
    doc.ents = spacy.util.filter_spans(spans)

Also shouldn't add_tokens be before add_matches in the pipeline?

Ah, that's strange – because nlp.pipe specifically returns a generator. You can set a batch_size argument on it, though, so I wonder if setting that to 10 (or whatever you batch size is) will do the trick and will prevent it from buffering more examples :thinking: There's nothing wrong with the other solution, though – it's just that it might be slightly slower.

:+1: To add to this, you can find the implementation of filter_spans here in case you want to customise it (pretty straightforward, it's just the index caculation that's slightly abstract):

Btw, in spaCy v3, the Matcher will support a greedy argument when you add patterns, so you'll be able to make it prefer the first or longest match out-of-the-box :slightly_smiling_face:

If I read the code correctly, it shouldn't matter in this case – add_tokens adds the "tokens" property to the JSON data for Prodigy and ensures that existing "spans" map to tokens and include the token start/end references. That's needed to render the data in the UI, but not by the function adding the matches (which tokenizes itself).

That said, I'd typically recommend calling add_tokens last or as late as possible, since it also updates the "spans" with the required token information. So if you have logic in your recipe that only adds spans with character offsets, they'll be updated with the token_start and token_end automatically, so you don't have to worry about that. (And if something doesn't match, you'll see an error immediately.)

Thanks @ines, the other problem I ran into was as far as I understand spacy.matcher.Matcher matches tokens not chars? If you're passing a list of words and your tokeniser creates word tokens then I think it works but in my case my patterns spanned tokens. So I had to tokenise my patterns as well, i.e.

matcher = spacy.matcher.Matcher(nlp.vocab)
with open(patterns, 'r') as fp:
    terms = json.load(fp)
    for term, label in terms.items():
        tokens = nlp(term)
        matcher.add(label, [[dict(LOWER=token.text) for token in tokens]], on_match=add_event_handler)

Yes, the Matcher lets you find spans based on token descriptions. In the simplest case, you pattern might just be [{"LOWER": "word"}], but the pattern syntax also lets you express more complex patterns like "case-insensitive token X followed by verb with lemma Y" etc. Alternatively, there's also PhraseMatcher that lets you match exact occurrences of words and phrases using Doc objects as patterns.

Thanks for pointing this out. :slight_smile: Another bug to squash.

That is interesting. I'll give it a shot. I had several thousand example in my generator. Since I'm working with sensitive data, I have to decrypt records one at a time. I think my bottleneck is there more than anywhere else. It's fast enough to label the data. :slight_smile:

This is my next step as well, trying to get pattern matcher, character highlighting, and the model predictions (in ner.correct mode) to play nicely together. I have a lot of OCR data where words are concatenated.