Hi. Could you please suggest the good code architecture to solve my task.
I am new in Ray, but have large overall experience, so I mostly need direction than specific code example, but code will be welcomed too
How severe does this issue affect your experience of using Ray?
- None: Just asking a question out of curiosity
Description of task
I have 1 TB of gzipped jsons(jfyi: it is actually jaeger traces), splitted by groups of gzip files 7-10Gb in size. Each few hours next 5 files is produced.
I need to parse all of this and put to parquet files, with some preprocessing. the files in one group of 5 files should be processed all together, because data sharded between them.
Parsing should be streamed to be able to run directly on backup node(it is not so weak, 20 cpus and 128Gb is there, but there is no way everything could be loaded in memory)
Naive approach
If I try directly parse json, each file will be processed in only one thread, you know, it is very slow.
Better approach
My approach is the following: I do the gzip reader which works in one thread, which simple cuts gzip file to chunks approximately 512Mb in size. Unpacking gzip in one thread is reasonable fast, and this chunks could be parsed/processed/saved in parallel.
def load_bin_stream(epoch, filename, chunk=512**3):
rest=b''
with GzipFile(filename,"r") as f:
while True:
buffer = f.read(chunk)
if not buffer:
break
afternewline = buffer.rfind(b'\n')
afternewline += 1
retval = pd.DataFrame({'epoch':int,'filename':[filename],'pos':[f.tell()], 'size':[os.fstat(f.fileno()).st_size], 'data':[rest + buffer[:afternewline]] })
yield ray.data.from_pandas(retval, epoch=epoch)
rest = buffer[afternewline:]
if len(rest)!=0:
retval = pd.DataFrame({'epoch':int,'filename':[filename],'pos':[f.tell()], 'size':[os.fstat(f.fileno()).st_size], 'data':[rest] })
yield ray.data.from_pandas(retval, epoch=epoch)
and now I could process one file by following way:
pipeline = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(epoch, filename, chunk=512**3) ])
pipeline
The problems with design occurs when we remember we need to parse 5 files:
pipe1 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename1, chunk=512**3) ])
pipe2 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe3 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe4 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
pipe5 = ray.data.DatasetPipeline.from_iterable([ (lambda:x) for x in load_bin_stream(1, filename2, chunk=512**3) ])
On 1st look we have two problems.
-
We have
ray.data.DatasetPipeline.split
but we don’t have ray.data.DatasetPipeline.join. How it could be implemented? -
actually all this 5 pipelined are made from iterators which works in main thread, it will be one after another i.e. slow. I.e. probably they should be run as remotes. But I noticed there is impossible to return pipeline from remote since it is not serializable.
Probable good approach
Looks like I need to see in the direction of ray.util.Queue, i.e. start 5 remotes, unpack binary there, then parse json/postprocess in for exsample 20 workers.
I probably could do working solution, but my question is - how this problem is better solved in ray design? I.e. I’ve found zero example of people using Queue and DatasetPipeline, i.e. may be I doing something wrong.
Who could recommend the best way in ray architecture?