Recipe to process a bunch of files

Hello,

Given scenario: I have a 3000 few megabyte video files in a S3 bucket and I need to process these files using some Python function that takes about 10 minutes to complete and generates a result file that need to be uploaded back into a S3 bucket. Since it takes 3000 files * 10 minutes = 20 days to process them sequentially, I’m looking for a way to process them in a distributed manner.

I tried several approaches using Ray + AWS EKS cluster. The most promising solution was:

from ray.util.queue import Queue
queue = Queue(actor_options={"name": "queue", "num_cpus": 0.1, "lifetime": "detached", "get_if_exists": True})

queue.put_nowait_batch(keys) # all keys to be processed

@ray.remote(num_cpus=4, num_gpus=1, max_restarts=5, max_task_retries=5)
class ProcessorActor(object):
    def __init__(self, queue):
        self.queue = queue

    def process(self, key):
        # some processing goes here

    # start a worker that takes keys from the queue and process them sequentially
    def start(self):
        while not self.queue.empty():
            key = self.queue.get(block=True)
            self.process(key)
        print("No more files to process. Exiting...")
        ray.actor.exit_actor()

# define a number of workers required
WORKERS_NUM = 5
actors = [AreaMeterActor.options(name=f"actor{i}", lifetime="detached", get_if_exists=True).remote(queue) for i in range(0, WORKERS_NUM)]

# start workers
[actor.start.remote() for actor in actors]

Despite the fact that this approach serves as a good starting point, I’ve noticed two disadvantages:

  1. If the ProcessorActor#process() method crashes and a key was not processed, it won’t be returned into the queue. This means that the processing will not be retried after a single failure.
  2. If the ProcessorActor#start() method crashes for some reason, it won’t be retried and an Actor will stick doing nothing, wasting a node time. It requires periodic monitoring and manual restarts.

Since this scenario seems pretty trivial, I’m wondering if there is a robust and mature solution to this problem? Thank you

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

hi @plashchynski looks Ray Datasets: Distributed Data Preprocessing — Ray 2.2.0 could be potentially a better solution than directly using Ray core API.
We have an example Scaling OCR using Ray Datasets — Ray 2.2.0 that does similar image processing workload which you might find interesting…

1 Like