Please suggest good pipeline architecture

Hi. Could you please suggest the good code architecture to solve my task.
I am new in Ray, but have large overall experience, so I mostly need direction than specific code example, but code will be welcomed too

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity

Description of task
I have 1 TB of gzipped jsons(jfyi: it is actually jaeger traces), splitted by groups of gzip files 7-10Gb in size. Each few hours next 5 files is produced.

I need to parse all of this and put to parquet files, with some preprocessing. the files in one group of 5 files should be processed all together, because data sharded between them.

Parsing should be streamed to be able to run directly on backup node(it is not so weak, 20 cpus and 128Gb is there, but there is no way everything could be loaded in memory)

Naive approach
If I try directly parse json, each file will be processed in only one thread, you know, it is very slow.

Better approach
My approach is the following: I do the gzip reader which works in one thread, which simple cuts gzip file to chunks approximately 512Mb in size. Unpacking gzip in one thread is reasonable fast, and this chunks could be parsed/processed/saved in parallel.

def load_bin_stream(epoch, filename, chunk=512**3):

    rest=b''

    with GzipFile(filename,"r") as f:

        while True:
            buffer = f.read(chunk)

            if not buffer:
                break

            afternewline = buffer.rfind(b'\n')
            afternewline += 1

            retval = pd.DataFrame({'epoch':int,'filename':[filename],'pos':[f.tell()], 'size':[os.fstat(f.fileno()).st_size], 'data':[rest + buffer[:afternewline]] })
            yield ray.data.from_pandas(retval, epoch=epoch)
            rest = buffer[afternewline:]

        if len(rest)!=0:
            retval = pd.DataFrame({'epoch':int,'filename':[filename],'pos':[f.tell()], 'size':[os.fstat(f.fileno()).st_size], 'data':[rest] })
            yield ray.data.from_pandas(retval, epoch=epoch)

and now I could process one file by following way:

pipeline = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(epoch, filename, chunk=512**3) ])
pipeline

The problems with design occurs when we remember we need to parse 5 files:

pipe1 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename1, chunk=512**3) ])
pipe2 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe3 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe4 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe5 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])

On 1st look we have two problems.

  1. We have ray.data.DatasetPipeline.split but we don’t have ray.data.DatasetPipeline.join. How it could be implemented?

  2. actually all this 5 pipelined are made from iterators which works in main thread, it will be one after another i.e. slow. I.e. probably they should be run as remotes. But I noticed there is impossible to return pipeline from remote since it is not serializable.

Probable good approach
Looks like I need to see in the direction of ray.util.Queue, i.e. start 5 remotes, unpack binary there, then parse json/postprocess in for exsample 20 workers.
I probably could do working solution, but my question is - how this problem is better solved in ray design? I.e. I’ve found zero example of people using Queue and DatasetPipeline, i.e. may be I doing something wrong.

Who could recommend the best way in ray architecture?

We don’t have join API yet but it gets brought up a few times by external users.

I think the closest example we had that is similar to your workload is https://sourcegraph.com/github.com/ray-project/ray/-/blob/release/nightly_tests/dataset/pipelined_training.py?L221 that we have a Windower that dynamically group file splits for downstream consumer (Trainer in this example).

But I think overall your workload might be better off written with just plain ray tasks/actors with a queue to handle file streaming as you mentioned; Ray Dataset focuses more on the preprocessing + ingest after all data is loaded by dataset to facilitate downstream consumer, which is more about what happens after you wrote all parquet files rather than how to produce them from gzip files.

Let me know if there’s anything not clear or you think there’s more context we need to know to help you better – I can loop in folks from Ray Dataset team if needed.