Recommended way to parallelize ray.get() calls to the driver (to pipeline Dataloader)

Hello, thanks for building a great library!

I am not sure if this is possible, but basically I would like to have concurrent ray.get() calls running in a Dataloader so that multiple processes are bringing data from the object store onto the driver process at once while doing other work in a pipelined manner. The code looks a bit like this:

# Somwhere in driver code
refs = ray.put([my_big_data_A, my_big_data_B, my_big_data_C])

def ray_get_worker(ref):
    return ray.get(ref)

# Later on in Dataloader
def __next__():
    # I want to fetch all handles concurrently while doing other work is ongoing
    ray.get(refs) # too slow AND blocks

    p = mp.Pool(4)
    parallel_get = p.imap(ray_get_worker, refs) # doesn't block and in theory 
                                                  would have 4x workers (doesn't work though ;)

    # work that takes 500ms (I would like ray.get() to be 'running' during this time)

    # grab "ray.get()" results
    for result in parallel_get:
        # do something with local data

I read that Ray is faster than multiprocessing (which would have to serialize returning the large data structures to the driver) so understand that using multiprocessing here won’t work but am looking for an equivalent way to ray.get() multiple refs in a non blocking, synchronous manner, if such a thing exists!

Hello, if this helps, have you looked at Dask on ray? Dask on Ray — Ray v2.0.0.dev0

Hi @cpayne,

I am not sure I understand what exactly you want to do. Once you get a handle from a remote() call that task will run. You do not need to use ray.get to make the task run. You use it to get the results of the task. As you pointed out ray.get will block or timeout if you give it one until the task finishes and returns.

The part I am unsure about is if you want to:
a.) start some tasks, do some separate processing independent of the results of the first task, and then collect the previous tasks results? You can already do this by moving the get until after the 500ms part


b.) Start some tasks, collect results from tasks have finished, do some processing on them, collect the next batch of results from tasks that finished, process, until all tasks are done? If you want to do this type of workflow then you can use ray.wait instead of ray.get.

As @asm582 pointed out there may be a higher-level api like dask that is more appropriate for your needs.