Hi there,
We are using prodigy to accept/reject pre-annotated videos with a custom recipe.
These videos are pulled from S3 and are typically 1-3 seconds long (under 2mb).
Given a set of 50 videos to annotate, and a batch size of 10, we are seeing around 20% duplicates. The prodigy interface shows ~60 videos annotated, whereas there are only 50 in the stream we provide to prodigy. The prodigy DB also only shows 50 examples have been annotated
We've debugged the stream itself, and can confirm that only 50 items are returned by the stream. We've even wrapped the stream with filter_duplicates
as suggested in another support topic, but that has had no effect. It feels like there is a bug within the prodigy app that is leaking items into the next batch of tasks that are fetched from the stream, possibly due to the size of our tasks given the video nature? Here's the code that we're using for our custom recipe. Let me know if there's any additional logs/metrics that'd be useful for you
import boto3
import prodigy
import json
import os
from prodigy.util import bytes_to_b64
from prodigy.components.filters import filter_duplicates
@prodigy.recipe("ar_recipe")
def ar_recipe(bucket, prefix=None, vid_format='video/mp4', include='_annotated.mp4'):
prefix = os.environ.get('PRODIGY_SAMPLES_PREFIX', prefix)
blocks = [
{ "view_id": "audio_manual" },
{ "view_id": "html" }
]
title_suffix = ' action'
s3 = boto3.client('s3')
def get_stream():
paginator = s3.get_paginator('list_objects')
paginate_params = {
'Bucket': bucket
}
if prefix is not None:
paginate_params['Prefix'] = prefix
page_iterator = paginator.paginate(**paginate_params)
item_num = 0
# Iterate through the pages.
for page in page_iterator:
# Iterate through items on the page.
for obj in page['Contents']:
vid_key = obj['Key']
if vid_key.endswith(include):
# Read the video.
highlighted_video = s3.get_object(Bucket=bucket, Key=vid_key).get('Body').read()
events_path = vid_key.replace(include, '')
event = s3.get_object(Bucket=bucket, Key=f'{events_path}.json').get('Body').read().decode('utf-8')
event = json.loads(event)
event_type = event.get('action_type').lower()
html = f"<h2>{event_type}{title_suffix}</h2><div style='display:none;'>{vid_key.replace('/', '_')}</div>"
task = {
'video': bytes_to_b64(highlighted_video, vid_format),
'path': f's3://{bucket}/{vid_key}',
'html': html,
'action_event': event_type,
'text': vid_key.replace('/', '_'),
'_task_hash': item_num
}
item_num += 1
yield task
stream = filter_duplicates(get_stream())
config = {
"dataset": prefix.replace('/','_'),
'view_id': "blocks",
'stream': stream,
'config': {
"labels": [ "RELEVANT" ],
"blocks": blocks,
"choice_auto_accept": True,
"audio_autoplay": True,
"global_css": ".c0177 { display: None; } .prodigy-title { display: None; }",
}
}
return config
Appreciate the help, thanks