Run Ray Dataset in a big dataset

I’m currently evaluating Ray framework (v1.12.0) for an ML pipeline solution.

As with most of the ML Pipelines, we start to load and transform our dataset. For now, the dataset is composed of a set of files (compressed) with a total size of ~500GB (~30GB compressed).
The files follow a JSONLINES approach where each line of the file corresponds to a valid JSON.
Each of these lines is then fed to the ML Pipeline.
Please note that each uncompressed file can take a max of 12GB and each line in that file has a max size of 1GB. Also, we plan to have TB or even PB of data.

We started by doing the following pipeline. We need to apply some filter on the JSON in that line and then count a property.

def get_aws_filenames():
    return ["file1", ....., "fileN"]

def filter_line(line):
   # here we transform the line and load some objects in memory
    filtered_line = foo(line)
    return filtered_line

def count_stuff(line: bytes):
    filtered_line = foo(line)
    return len(filtered_line)


def do_everything(s3_file_filename: str):
    with tempfile.NamedTemporaryFile() as tmp:
        # .... download file to tmp.name
        with gzip.open(tmp.name,'r') as fin:        
            counter = 0
            for line in fin:        
                counter = count_stuff(line) + counter
            return counter

# Init ray
ray.init()
#
results = ray.data.from_items(get_aws_filenames()) \
                    .map(do_everything) \
                    .sum()

As you can imagine this flow does not make sense, what we would like to have is something like:

# All previous functions


def download_unzip_and_yield(s3_file_filename: str):
    with tempfile.NamedTemporaryFile() as tmp:
        # .... download file to tmp.name
        with gzip.open(tmp.name,'r') as fin:        
            for line in fin:        
                yield line

results = ray.data.from_items(get_aws_filenames()) \
                    .flat_map(download_unzip_and_yield) \
                    .map(filter_line) \
                    .map(count_stuff) \
                    .sum()

It happens that in this case does not fuse the stages and, eventually, all workers die from OOM errors (i’m using an K8s setup with an headnode with 15GB RAM and multiple worker nodes with 10GB RAM).
How can we solve the issue?

Thanks in advance!

Please note that each uncompressed file can take a max of 12GB and each line in that file has a max size of 1GB

This is a pretty challenging case that unfortunately not even stage fusion would not help with necessarily right now. (You can experiment with fusion by using ray.data.from_items().experimental_lazy().flat_map()...— it might work even now if you increase the node memory). However, this will work in the future once we have block splitting enabled as well, which will reduce the memory usage for large files in conjunction with fusion.

The key issue is that without block splitting, Ray data will load each file as a single block (presumably using ~12GB memory, maybe less in Arrow format), prior to applying the filter.

For now I’d recommend the manually fused approach you have with do_everything().

cc @Stephanie_Wang and @Clark_Zinzow

Thanks for the fast response @ericl !
We already tried the .experimental_lazy() (I think in the version that we are using is .**_**experimental_lazy()) but with no success.

Will keep an eye on future developments!