Is there a way for prodigy to react to changes of data source

Say I am using command below to start prodigy.

prodigy pdf.spans.manual papers blank:en ./pdfs --label EVENT,PLACE --focus text,list_item

After prodigy is running, if I add a file, prodigy will not add the file to the session. Is there a way to achieve this? Or are the tasks in session fixed at the time of running prodigy?

Thank you.

Hi @paradm,

By default, stream creation is done just once per Prodigy recipe runtime, so yes, the tasks are fixed by default - at least in the current implementation of the pdf.spans.manual recipe and other built-in recipes.

However, a Prodigy loader is just a Python function, and you should be able to customize it so that it checks at regular intervals whether new files are present in the source directory (or calls an API or queries the database whatever your method of streaming data is).
Normally, you can just plug in the custom loader to built-in recipes, but with pdf.spans.manual it is a little bit different because the loader is actually an integral part of the recipe, so you'd need to customize the recipe code instead.

Here's how it could look like:
The LayoutStream would check for the new files everytime the current stream has finished processing all the files in the directory in an infinite loop:

[...]

class LayoutStream:
    def __init__(
        self,
        f: PathInputType,
        nlp: Language,
        file_ext: List[str] = ["pdf"],
        view_id: ViewId = "spans_manual",
        split_pages: bool = False,
        hide_preview: bool = False,
        focus: List[str] = [],
        check_interval: int = 5, # add configurable frequency of the checks
    ) -> None:
        self.dir_path = ensure_path(f)
        if not self.dir_path.exists() or not self.dir_path.is_dir():
            raise RecipeError(f"Can't load from directory {f}", self.dir_path.resolve())
        
        self.file_ext = file_ext
        self.view_id = view_id
        self.split_pages = split_pages
        self.hide_preview = hide_preview
        self.focus = focus
        self.nlp = nlp
        self.check_interval = check_interval
        self.processed_files = set()
        
        self.layout = spaCyLayout(nlp, separator=SEPARATOR)
        log("RECIPE: Initialized spacy-layout")

    def get_available_files(self) -> List[Path]:
        """Get list of all valid PDF files in the directory."""
        return [
            path
            for path in sorted(self.dir_path.iterdir())
            if path.is_file()
            and not path.name.startswith(".")
            and (path.suffix.lower()[1:] in self.file_ext)
        ]

    def get_new_files(self) -> List[Path]:
        """Get list of new unprocessed PDF files."""
        current_files = self.get_available_files()
        new_files = [f for f in current_files if str(f.resolve()) not in self.processed_files]
        if new_files:
            msg.info("New files detected!")
        return new_files

    def process_file(self, file_path: Path) -> StreamType:
        """Process a single PDF file."""
        doc = self.layout(file_path)
        images = pdf_to_images(file_path) if not self.hide_preview else None
        
        if self.focus:
            yield from self._process_file_focus(doc, file_path, images)
        else:
            yield from self._process_file_full(doc, file_path, images)
        
        # Mark file as processed
        self.processed_files.add(str(file_path.resolve()))

    def _process_file_full(self, doc: Doc, file_path: Path, images: Optional[List[str]]) -> StreamType:
        blocks = [{"view_id": self.view_id}]
        if not self.hide_preview:
            blocks.append({"view_id": "image", "spans": []})
        
        pages = []
        for i, (page_layout, page_spans) in enumerate(doc._.get(self.layout.attrs.doc_pages)):
            token_labels = get_token_labels(doc)
            tokens = []
            if page_spans:
                tokens = get_layout_tokens(
                    doc[page_spans[0].start : page_spans[-1].end],
                    token_labels,
                )
            page = {
                "text": SEPARATOR.join(span.text for span in page_spans),
                "tokens": tokens,
                "width": page_layout.width,
                "height": page_layout.height,
                "view_id": "blocks",
                "config": {"blocks": blocks},
            }
            if not self.hide_preview and images:
                page["image"] = images[i]
            pages.append(page)
            if self.split_pages:
                meta = {"title": file_path.stem, "page": page_layout.page_no}
                yield set_hashes({**page, "meta": meta})
        
        if not self.split_pages:
            yield set_hashes({"pages": pages, "meta": {"title": file_path.stem}})

    def _process_file_focus(self, doc: Doc, file_path: Path, images: Optional[List[str]]) -> StreamType:
        for i, (page_layout, page_spans) in enumerate(doc._.get(self.layout.attrs.doc_pages)):
            token_labels = get_token_labels(doc)
            for span in page_spans:
                if span.label_ not in self.focus:
                    continue
                blocks = [{"view_id": self.view_id}]
                if not self.hide_preview:
                    span_layout = span._.get(self.layout.attrs.span_layout)
                    image_spans = []
                    if span_layout:
                        image_spans.append({
                            "x": span_layout.x,
                            "y": span_layout.y,
                            "width": span_layout.width,
                            "height": span_layout.height,
                            "color": "magenta",
                            "id": span.id,
                        })
                    blocks.append({"view_id": "image", "spans": image_spans})
                eg = {
                    "text": span.text,
                    "tokens": get_layout_tokens(span, token_labels),
                    "width": page_layout.width,
                    "height": page_layout.height,
                    "view_id": "blocks",
                    "config": {"blocks": blocks},
                    "text_span": {
                        "token_start": span.start,
                        "token_end": span.end - 1,
                        "start": span.start_char,
                        "end": span.end_char,
                        "text": span.text,
                        "label": span.label_,
                    },
                    "meta": {"title": file_path.stem, "page": page_layout.page_no},
                }
                if not self.hide_preview and images:
                    eg["image"] = images[i]
                yield set_hashes(eg)

    def get_stream(self) -> StreamType:
        """Process files with queue-based checking for new files."""
        while True:
            new_files = self.get_new_files()
            
            # If no new files and no tasks in queue, wait and check again
            if not new_files:
                time.sleep(self.check_interval)
                continue
                
            # Process all files in current batch
            for file_path in new_files:
                try:
                    yield from self.process_file(file_path)
                except Exception as e:
                    log(f"Error processing file {file_path}: {str(e)}")


@recipe(
    "pdf.spans.manual",
    # fmt: off
    dataset=Arg(help="Dataset to save annotations to"),
    nlp=Arg(help="Loadable spaCy pipeline"),
    source=Arg(help="Path to directory of PDFs or dataset/JSONL file created with pdf.layout.fetch"),
    labels=Arg("--label", "-l", help="Comma-separated label(s) to annotate or text file with one label per line"),
    add_ents=Arg("--add-ents", "-E", help="Add named enitites for the given labels via the spaCy model"),
    focus=Arg("--focus", "-f", help="Focus mode: annotate selected sections of a given type, e.g. 'text'"),
    disable=Arg("--disable", "-d", help="Labels of layout spans to disable, e.g. 'footnote'"),
    split_pages=Arg("--split-pages", "-S", help="View pages as separate tasks"),
    hide_preview=Arg("--hide-preview", "-HP", help="Hide side-by-side preview of layout"),
    check_interval=Arg("--check-interval", "-CI", help="Interval in seconds to check for new files"),

    # fmt: on
)
def pdf_spans_manual(
    dataset: str,
    nlp: Language,
    source: str,
    labels: Optional[List[str]] = None,
    add_ents: bool = False,
    focus: Optional[List[str]] = None,
    disable: Optional[List[str]] = None,
    hide_preview: bool = False,
    split_pages: bool = False,
    check_interval: int = 5,
) -> ControllerComponentsDict:
    """
    Apply span annotations to text-based document contents extracted with
    spacy-layout and Docling. For efficiency, the recipe can run with
    --focus text to walk through individual text blocks, which are highlighted
    in a visual preview of the document page.
    """
    log("RECIPE: Starting recipe pdf.spans.manual", locals())
    view_id = "spans_manual"
    if source.endswith(".jsonl") or _source_is_dataset(source, None):
        # Load from existing data created with pdf.layout.fetch
        stream = get_stream(source)
    else:
        layout_stream = LayoutStream(
            source,
            nlp=nlp,
            file_ext=["pdf"],
            view_id=view_id,
            split_pages=split_pages,
            hide_preview=hide_preview,
            focus=focus or [],
            check_interval=check_interval,
        )

        stream = Stream.from_iterable(layout_stream.get_stream())
    if add_ents:
        labels = resolve_labels(nlp, "ner", recipe_labels=labels)
        stream.apply(preprocess_ner_stream, nlp, labels=labels, unsegmented=True)
    if disable:
        stream.apply(disable_tokens, disabled=disable)
    css = CSS
    if hide_preview:
        stream.apply(remove_preview, view_id=view_id)
    else:
        css += CSS_PREVIEW

    return {
        "dataset": dataset,
        "stream": stream,
        "view_id": "pages" if not split_pages and not focus else "blocks",
        "config": {
            "labels": labels,
            "global_css": css,
            "shade_bounding_boxes": True,
            "custom_theme": {
                "cardMaxWidth": "95%",
                "smallText": FONT_SIZE_TEXT,
                "tokenHeight": 25,
            },
        },
    }

As you can see in the get_stream it would wait some time before re-checking and this waiting period (check_interval) is configurable at the recipe level.

As a reminder the pdf.spans.manual and the entire PDF plugin is open source so you can fork it from the GitHub, modify the source could and reinstall from your local copy.