Hi @paradm,
By default, stream
creation is done just once per Prodigy recipe runtime, so yes, the tasks are fixed by default - at least in the current implementation of the pdf.spans.manual
recipe and other built-in recipes.
However, a Prodigy loader is just a Python function, and you should be able to customize it so that it checks at regular intervals whether new files are present in the source directory (or calls an API or queries the database whatever your method of streaming data is).
Normally, you can just plug in the custom loader to built-in recipes, but with pdf.spans.manual
it is a little bit different because the loader is actually an integral part of the recipe, so you'd need to customize the recipe code instead.
Here's how it could look like:
The LayoutStream would check for the new files everytime the current stream has finished processing all the files in the directory in an infinite loop:
[...]
class LayoutStream:
def __init__(
self,
f: PathInputType,
nlp: Language,
file_ext: List[str] = ["pdf"],
view_id: ViewId = "spans_manual",
split_pages: bool = False,
hide_preview: bool = False,
focus: List[str] = [],
check_interval: int = 5, # add configurable frequency of the checks
) -> None:
self.dir_path = ensure_path(f)
if not self.dir_path.exists() or not self.dir_path.is_dir():
raise RecipeError(f"Can't load from directory {f}", self.dir_path.resolve())
self.file_ext = file_ext
self.view_id = view_id
self.split_pages = split_pages
self.hide_preview = hide_preview
self.focus = focus
self.nlp = nlp
self.check_interval = check_interval
self.processed_files = set()
self.layout = spaCyLayout(nlp, separator=SEPARATOR)
log("RECIPE: Initialized spacy-layout")
def get_available_files(self) -> List[Path]:
"""Get list of all valid PDF files in the directory."""
return [
path
for path in sorted(self.dir_path.iterdir())
if path.is_file()
and not path.name.startswith(".")
and (path.suffix.lower()[1:] in self.file_ext)
]
def get_new_files(self) -> List[Path]:
"""Get list of new unprocessed PDF files."""
current_files = self.get_available_files()
new_files = [f for f in current_files if str(f.resolve()) not in self.processed_files]
if new_files:
msg.info("New files detected!")
return new_files
def process_file(self, file_path: Path) -> StreamType:
"""Process a single PDF file."""
doc = self.layout(file_path)
images = pdf_to_images(file_path) if not self.hide_preview else None
if self.focus:
yield from self._process_file_focus(doc, file_path, images)
else:
yield from self._process_file_full(doc, file_path, images)
# Mark file as processed
self.processed_files.add(str(file_path.resolve()))
def _process_file_full(self, doc: Doc, file_path: Path, images: Optional[List[str]]) -> StreamType:
blocks = [{"view_id": self.view_id}]
if not self.hide_preview:
blocks.append({"view_id": "image", "spans": []})
pages = []
for i, (page_layout, page_spans) in enumerate(doc._.get(self.layout.attrs.doc_pages)):
token_labels = get_token_labels(doc)
tokens = []
if page_spans:
tokens = get_layout_tokens(
doc[page_spans[0].start : page_spans[-1].end],
token_labels,
)
page = {
"text": SEPARATOR.join(span.text for span in page_spans),
"tokens": tokens,
"width": page_layout.width,
"height": page_layout.height,
"view_id": "blocks",
"config": {"blocks": blocks},
}
if not self.hide_preview and images:
page["image"] = images[i]
pages.append(page)
if self.split_pages:
meta = {"title": file_path.stem, "page": page_layout.page_no}
yield set_hashes({**page, "meta": meta})
if not self.split_pages:
yield set_hashes({"pages": pages, "meta": {"title": file_path.stem}})
def _process_file_focus(self, doc: Doc, file_path: Path, images: Optional[List[str]]) -> StreamType:
for i, (page_layout, page_spans) in enumerate(doc._.get(self.layout.attrs.doc_pages)):
token_labels = get_token_labels(doc)
for span in page_spans:
if span.label_ not in self.focus:
continue
blocks = [{"view_id": self.view_id}]
if not self.hide_preview:
span_layout = span._.get(self.layout.attrs.span_layout)
image_spans = []
if span_layout:
image_spans.append({
"x": span_layout.x,
"y": span_layout.y,
"width": span_layout.width,
"height": span_layout.height,
"color": "magenta",
"id": span.id,
})
blocks.append({"view_id": "image", "spans": image_spans})
eg = {
"text": span.text,
"tokens": get_layout_tokens(span, token_labels),
"width": page_layout.width,
"height": page_layout.height,
"view_id": "blocks",
"config": {"blocks": blocks},
"text_span": {
"token_start": span.start,
"token_end": span.end - 1,
"start": span.start_char,
"end": span.end_char,
"text": span.text,
"label": span.label_,
},
"meta": {"title": file_path.stem, "page": page_layout.page_no},
}
if not self.hide_preview and images:
eg["image"] = images[i]
yield set_hashes(eg)
def get_stream(self) -> StreamType:
"""Process files with queue-based checking for new files."""
while True:
new_files = self.get_new_files()
# If no new files and no tasks in queue, wait and check again
if not new_files:
time.sleep(self.check_interval)
continue
# Process all files in current batch
for file_path in new_files:
try:
yield from self.process_file(file_path)
except Exception as e:
log(f"Error processing file {file_path}: {str(e)}")
@recipe(
"pdf.spans.manual",
# fmt: off
dataset=Arg(help="Dataset to save annotations to"),
nlp=Arg(help="Loadable spaCy pipeline"),
source=Arg(help="Path to directory of PDFs or dataset/JSONL file created with pdf.layout.fetch"),
labels=Arg("--label", "-l", help="Comma-separated label(s) to annotate or text file with one label per line"),
add_ents=Arg("--add-ents", "-E", help="Add named enitites for the given labels via the spaCy model"),
focus=Arg("--focus", "-f", help="Focus mode: annotate selected sections of a given type, e.g. 'text'"),
disable=Arg("--disable", "-d", help="Labels of layout spans to disable, e.g. 'footnote'"),
split_pages=Arg("--split-pages", "-S", help="View pages as separate tasks"),
hide_preview=Arg("--hide-preview", "-HP", help="Hide side-by-side preview of layout"),
check_interval=Arg("--check-interval", "-CI", help="Interval in seconds to check for new files"),
# fmt: on
)
def pdf_spans_manual(
dataset: str,
nlp: Language,
source: str,
labels: Optional[List[str]] = None,
add_ents: bool = False,
focus: Optional[List[str]] = None,
disable: Optional[List[str]] = None,
hide_preview: bool = False,
split_pages: bool = False,
check_interval: int = 5,
) -> ControllerComponentsDict:
"""
Apply span annotations to text-based document contents extracted with
spacy-layout and Docling. For efficiency, the recipe can run with
--focus text to walk through individual text blocks, which are highlighted
in a visual preview of the document page.
"""
log("RECIPE: Starting recipe pdf.spans.manual", locals())
view_id = "spans_manual"
if source.endswith(".jsonl") or _source_is_dataset(source, None):
# Load from existing data created with pdf.layout.fetch
stream = get_stream(source)
else:
layout_stream = LayoutStream(
source,
nlp=nlp,
file_ext=["pdf"],
view_id=view_id,
split_pages=split_pages,
hide_preview=hide_preview,
focus=focus or [],
check_interval=check_interval,
)
stream = Stream.from_iterable(layout_stream.get_stream())
if add_ents:
labels = resolve_labels(nlp, "ner", recipe_labels=labels)
stream.apply(preprocess_ner_stream, nlp, labels=labels, unsegmented=True)
if disable:
stream.apply(disable_tokens, disabled=disable)
css = CSS
if hide_preview:
stream.apply(remove_preview, view_id=view_id)
else:
css += CSS_PREVIEW
return {
"dataset": dataset,
"stream": stream,
"view_id": "pages" if not split_pages and not focus else "blocks",
"config": {
"labels": labels,
"global_css": css,
"shade_bounding_boxes": True,
"custom_theme": {
"cardMaxWidth": "95%",
"smallText": FONT_SIZE_TEXT,
"tokenHeight": 25,
},
},
}
As you can see in the get_stream
it would wait some time before re-checking and this waiting period (check_interval
) is configurable at the recipe level.
As a reminder the pdf.spans.manual
and the entire PDF plugin is open source so you can fork it from the GitHub, modify the source could and reinstall from your local copy.