Hello Ines,
I was trying to implement the S3 loader function in our custom recipe but didn't quite succeeded, maybe I am doing something wrong.
Also we are encountering another problem of No Tasks Available
in our streams.
Here's a segment of our custom recipe, I would appreciate it if you can give me a hand in that please:
import base64
import json
import random
from datetime import datetime as dt
import prodigy
from prodigy.util import split_string
from prodigy.components.db import connect
global total_annotations_since_server_start
total_annotations_since_server_start = 0
global total_files
def get_priority_videos():
curr_path = os.path.dirname(__file__)
with open(os.path.join(curr_path, "priority_videos.txt"), "r") as pv:
p_videos = pv.readlines()
return p_videos
def split_csv(_in_string):
return _in_string.split(",")
def get_deny_list(source):
deny_list = []
for answer in ["accept", "ignore"]:
answer_file_name = os.path.join(source, "annotations",
f"{answer}.jsonl")
if not os.path.exists(answer_file_name):
continue
with open(answer_file_name) as fp:
for line in fp:
annotation = json.loads(line)
file_path = os.path.normpath(annotation["meta"]["path"])
deny_list.append(file_path)
return deny_list
def get_all_files(source, pv_flag):
all_files = []
deny_list = get_deny_list(source)
pv = None
if pv_flag:
try:
pv = get_priority_videos()
pv = [n.split("/")[-1].lstrip().rstrip() for n in pv]
print(f"Found list of {len(pv)} priority videos")
except Exception as e:
print("Priority flag ON but no priority videos found...")
filtered_videos = []
for path, dir, files in os.walk(source):
for _file in files:
if _file.endswith(".mp4"):
# If priority videos exists, add only those
if pv is not None:
if "_".join(_file.split("_")[:-1]).lstrip().rstrip() not in pv:
filtered_videos.append(_file)
continue
file_path = os.path.join(path, _file)
if file_path not in deny_list:
all_files.append(file_path)
random.shuffle(all_files)
print(f"Filtered {len(filtered_videos)} videos and added {len(all_files)} videos to queue for labeling...")
return all_files
def load_filenames(all_files, review_ds):
"""
Args:
all_files: A list if all pathnames to the data files
review_ds: None in normal mode, in review mode, [dict(file_name:spans)..]
Returns:
"""
for _file in all_files:
if _file.endswith(".mp4"):
with open(_file, "rb") as image_file:
encoded_string = 'data:video/mp4;base64,' + base64.b64encode(
image_file.read()).decode()
if review_ds is not None:
spans = review_ds[_file]
yield dict(video=encoded_string,
text=_file.split(".mp4")[1],
meta=dict(path=_file.split(".mp4")[0]), file=_file.split(".mp4")[1], audio_spans=spans)
else:
yield dict(video=encoded_string,
text=_file.split(".mp4")[1],
meta=dict(path=_file.split(".mp4")[0]), file=_file.split(".mp4")[1])
def get_stream(all_files, review_ds=None):
while True:
stream = load_filenames(all_files, review_ds)
return stream
@prodigy.recipe(
"audio_custom.manual",
dataset=("The dataset to use", "positional", None, str),
source=("Path to a directory of images", "positional", None, str),
pv_flag=("0 if priority videos are off 1 if ON", "option", "pv_flag", int),
label=("One or more comma-separated labels", "option", "l", split_string),
review=("JSONL file to review", "option", "r", str),
)
def audio_recipe(dataset, source, label, pv_flag, review):
if review is None:
all_files = get_all_files(source, pv_flag)
all_spans = None
else:
all_files = []
all_spans = dict()
try:
with open(review, "r") as jsonl_file:
jsonl_data = jsonl_file.readlines()
for line in jsonl_data:
json_data = json.loads(line)
file_name = json_data["file"]
audio_spans = json_data["audio_spans"]
all_files.append(file_name)
all_spans[file_name] = audio_spans
except Exception as e:
print(f"Error opening jsonl file : {e}")
global total_files
total_files = len(all_files)
def before_db(examples):
for eg in examples:
if "video" in eg.keys():
del eg['video']
# Timestamp to keep track of when annotations were done
eg["ts"] = dt.now().strftime("%Y-%m-%d-%H")
return examples
Thank you