Feature request: Allow ray.wait() to do the necessary work for an instant ray.get()

I want to pipeline processing the results of a bunch of Ray tasks by launching a series of tasks and then processing the results locally when ray.wait() indicates a task is ready. After ray.wait() says a result is ready, it would be great if ray.get() was near instant (ideally just a pointer transfer to the data stored in memory in Plasma). With this simple script though, ray.get() can sometimes take > 2ms on a ready ref.

import time
import numpy as np
import sys

@ray.remote
def get_big_array():
    big_array = np.random.randint(0, 255, int(20E6), dtype=np.uint8)
    return big_array

ref = get_big_array.remote()

# I expect this to wait for task completion, transfer, no rush
ready, _ = ray.wait([ref], num_returns=1)

# Get data locally
t0 = time.time()
# I'd like this to be instant
big_array_local = ray.get(ready)
print(f'Took {time.time() - t0} to get()')
2 Likes

Did you have this issue in Mac or Linux?

In this case, if we offload work to ray.wait, ray.wait will take longer instead right (so the whole latency might be similar)? Why does ray.get have to be instant in your use case?

Hey @sangcho , the reason for the performance requirement is because right now the ray.get is bottlenecking the whole system (there is a very tight feedback loop in this particular use case).

@rliaw Sorry about my ignorance here. But isn’t it going to have the same bottleneck at the end of the day? For example, let’s say you are waiting for 2 objects A and B. The current latency is;

ray.wait([A,B]) # Latency to fetch the object A and B (1)
ray.get([A, B]) # Latency to deserialize the object A and B. (2)

Latency == (1) + (2)

With the feature request;

ray.wait([A, B]) # Latency to fetch the object A and B (1) + deserialize the object A and B (2)
ray.get([A, B])

latency == (1) + (2)

Am I missing something here? Btw the latency of ray.get should be the deserialization cost. Not sure why it could take 2 ms when the zero-copy deserialization is used.

Btw the latency of ray.get should be the deserialization cost. Not sure why it could take 2 ms when the zero-copy deserialization is used.

Yeah, that’s precisely the problem. It should be ~2ms, but they’re seeing ray.get take ~7 seconds after ray.wait.

Then this is the deserialization problem, not if we’d like to deserialize when we use ray.wait or ray.get right?

Maybe, that sounds like it is more of a deserialization problem (but to be fair, the user perspective request remains the same)

Hmm I am still confused. If the issue is deserialization, doing jobs in ray.wait for ray.get won’t improve anything isn’t it? It just increases the time to return from ray.wait, and instead ray.get will be short (but the whole time will be the same). Am I missing something here.

I think the main issue that the user is reporting is that ray.get and ray.wait are showing huge discrepancies when they should not be.

Yeah I agree that’s the main issue, but the specific feature request (move work for ray.get to ray.wait) won’t improve the situation I think? It is more like there is probably a deserialization bug, not the problem of ray.wait doesn’t do work for ray.get.

Hey @sangcho sorry for the delayed response (I was ooto)

Here is some mock code to add a bit more detail to the data flow

def __next__(self):
    # Issue ray actor tasks so that work is always being done on other processes 
    while len(self.request_refs) < MAX_WORKERS:
        self.request_refs.append(do_some_work.remote())

    # Wait until at least one batch of work is ready, 
    # this should be instant once training warms up. Ideally, under the hood ray.wait()
    # will launch the necessary processes to convert *any* ready refs into a state that
    # they can be instantly accessed with ray.get()
    ready, busy = ray.wait(self.request_refs, num_returns=1)

    # Should be constant and instant
    batch = ray.get(ready[0])
    self.request_refs.remove(ready[0])

    return batch

The other issue is the variability of the deserialization which can lead to > 2ms ray.get() times. Are you able to repro this on your machines? This is tested on Linux.

Hmm I see. It makes sense now why you’d like this feature. It’d be great if you create enhancement request for this feature in the Github issue + deserialization issue with repro script. I am not aware of any particular issue here.

Yeah, I think this is a bit hard for us to reproduce without any more details, so a full repro script would be really helpful!

For one, we should try to verify that why deserialization is actually a bottleneck. For one, wrapping a class around the numpy array should not impact “ray.get”:

import ray
import numpy as np
ray.init()
class Foo:
    def __init__(self):
        self.x = np.ones(10**8)
f = Foo()
f_id = ray.put(f)
%time new_f = ray.get(f_id)
1 Like

Hey @cpayne do you think it is possible to make some repro script?

Hey @sangcho, for the deserialization issue, the snippet posted in my original post will sometimes take > 2ms in a notebook when I test on an A100 cluster as is.

For a snippet with the the Ray requests inside of a dataloader (to show the benefits of pipelining) I will need to mock this up without our source code which I’ll have to circle back to doing when I get some more time, should I file this as a github ticket with the repro? Thanks!

Yes! It’ll be great if you can create issues for both (so that we can keep track of it)!