I’m currently evaluating Ray framework (v1.12.0) for an ML pipeline solution.
As with most of the ML Pipelines, we start to load and transform our dataset. For now, the dataset is composed of a set of files (compressed) with a total size of ~500GB (~30GB compressed).
The files follow a JSONLINES approach where each line of the file corresponds to a valid JSON.
Each of these lines is then fed to the ML Pipeline.
Please note that each uncompressed file can take a max of 12GB and each line in that file has a max size of 1GB. Also, we plan to have TB or even PB of data.
We started by doing the following pipeline. We need to apply some filter on the JSON in that line and then count a property.
def get_aws_filenames(): return ["file1", ....., "fileN"] def filter_line(line): # here we transform the line and load some objects in memory filtered_line = foo(line) return filtered_line def count_stuff(line: bytes): filtered_line = foo(line) return len(filtered_line) def do_everything(s3_file_filename: str): with tempfile.NamedTemporaryFile() as tmp: # .... download file to tmp.name with gzip.open(tmp.name,'r') as fin: counter = 0 for line in fin: counter = count_stuff(line) + counter return counter # Init ray ray.init() # results = ray.data.from_items(get_aws_filenames()) \ .map(do_everything) \ .sum()
As you can imagine this flow does not make sense, what we would like to have is something like:
# All previous functions def download_unzip_and_yield(s3_file_filename: str): with tempfile.NamedTemporaryFile() as tmp: # .... download file to tmp.name with gzip.open(tmp.name,'r') as fin: for line in fin: yield line results = ray.data.from_items(get_aws_filenames()) \ .flat_map(download_unzip_and_yield) \ .map(filter_line) \ .map(count_stuff) \ .sum()
It happens that in this case does not fuse the stages and, eventually, all workers die from OOM errors (i’m using an K8s setup with an headnode with 15GB RAM and multiple worker nodes with 10GB RAM).
How can we solve the issue?
Thanks in advance!