Passing large binary files and directories between tasks

I’ve known about Ray for a long time, but never used it before. I’m coming from a different orchestration system (KFP) and I wanted to know whether something I’m used to is possible with Ray.

KFP orchestrates arbitrary containerized command-line programs and helps pass arbitrary data from outputs to inputs (often by mounting the specific data files/directories).

Ray does great work serializing data and working with many dataset formats. But what if my data is different and I need something lower-level?

Imagine that I have a function that converts a bunch of video files. And another function that trains an ML model on the transformed videos.

So, I need a way for a task to produce a big (say 100GB) binary multi-file dataset that cannot fit into memory in a format that Ray does not understand. Then pass this dataset to another task.
What would be the bets way to do this?
Should I use Ray’s data storage or should I avoid it for such cases?

P.S. With KFP I just write a function and mark certain parameters as input/output paths and the system does everything related to passing raw data for me (mount the input data, store the output data) during the distributed execution.:

def filter_text(
    text_path: InputPath(),
    filtered_text_path: OutputPath(),
    pattern: str,
):
    import re
    with open(text_path, 'r') as reader:
        with open(filtered_text_path, 'w') as writer:
            for line in reader:
                if re.search(pattern, line):
                    writer.write(line)

Hi @Ark-kun, if I understand you correctly, you have two tasks that are chained together with their output/input. I think there are a few points:

  • Ray Datasets is not a storage system: it’s a library for loading data from storage system (like S3, HDFS, databases) into memory, and processing the in-memory data distributedly.
  • Processing in a cluster: 100GB might be large for a single machine, do you consider run the workload in a cluster? In practice, we usually see use cases with data volume much larger than this scale.
  • Avoiding materializing the intermediate data: For the intermediate data produced by the first task, do you need to persist them, or you interest only in the final output from the second task? If those data has no need to keep, then it’ll be better to avoid materializing them into a storage system. Using Ray Datasets you can exchange the data between the two tasks via memory and potentially achieve much better performance.
  • Using DatasetPipeline: If the in-memory data footprint is a concern, there is another way which is using the DatasetPipeline (Pipelining Compute — Ray 3.0.0.dev0). For example, if you have 100 video files, you may choose to load, transform (the first task) and train ML model (the second task) for 10 files at a time; this means the memory will only need to provision for 10% of the original size. This is the large-scale ML Ingest pattern: Example: Large-scale ML Ingest — Ray 3.0.0.dev0