Multiple s3 bucket to stream in a custom recipe

Hi,

Can Prodigy stream images from separate s3 buckets?

Our unlabelled images are stored in two s3 buckets. Prodigy needs to be able to read for each (order does not matter).

Thanks for the help,
Felix

hi @SiteAssist!

Could you take this Prodigy s3 recipe and generalize it to handle multiple buckets, for something like this:

import boto3
import prodigy
import json
from prodigy.util import img_to_b64_uri

@prodigy.recipe("stream-from-s3")
def stream_from_s3(buckets, prefix=None):
    for bucket in buckets:
        # Get all loaded images.
        s3 = boto3.client('s3')

        # Build a paginator for when there are a lot of objects.
        paginator = s3.get_paginator('list_objects')
        paginate_params = {
            'Bucket': bucket
        }

        # Check if only certain images from S3 should be loaded.
        if prefix is not None:
            paginate_params['Prefix'] = prefix

        page_iterator = paginator.paginate(**paginate_params)

        # Iterate through the pages.
        for page in page_iterator:
            # Iterate through items on the page.
            for obj in page['Contents']:
                img_key = obj['Key']

                # Read the image.
                img = s3.get_object(Bucket=bucket, Key=img_key).get('Body').read()

                # Provide response that Prodigy expects.
                print(json.dumps({'image': img_to_b64_uri(img, 'image/jpg')}))

# Example usage
buckets_to_process = ['your_bucket_1', 'your_bucket_2']
stream_from_s3(buckets_to_process, prefix='optional_prefix')

I haven't tried it out but could you see if it works?

Thank you for that!

We have already implemented a custom get_stream function a bit different from the one you shared. Our bucket contain a lot of images hence keeping everything in memory would be an issue. So we are generating presigned_url. It works well with one stream (it also uses the db to check for the ones already processed).

What about if we wanted to switch bucket every other image so that we do not wait for one bucket to finish before processing the other?

So if you're okay with sequentially, you could modify the script to use an iterator to alternate between the specified buckets for every other image. For example:

But I assume you're trying to parallelize the processing of images from different buckets? That'll require you to explore concurrent or parallel processing techniques like

I don't have a ton of experience with it but it could be helpful. Hope this helps!

Great, thank you for that.

Few more questions:

1. Can you refresh the stream without restarting Prodigy?

This is inline with this other question Encrypt session ID

We would like to implement a filter on which images each annotator can see.

For example, after decrypting the session id, annotator_1 belongs to org_1.

Hence he should only see org_1 images.

Take a look at custom event hooks. Custom events are registered functions that can be returned from a recipe and called via the Prodigy frontend.

There are examples in the docs where the user in the frontend can update the stream (e.g., change the model in the loop or options). Here's the sample code for modifying the model-in-the-loop (LLM) in the stream.