I want to pipeline processing the results of a bunch of Ray tasks by launching a series of tasks and then processing the results locally when ray.wait() indicates a task is ready. After ray.wait() says a result is ready, it would be great if ray.get() was near instant (ideally just a pointer transfer to the data stored in memory in Plasma). With this simple script though, ray.get() can sometimes take > 2ms on a ready ref.
import time
import numpy as np
import sys
@ray.remote
def get_big_array():
big_array = np.random.randint(0, 255, int(20E6), dtype=np.uint8)
return big_array
ref = get_big_array.remote()
# I expect this to wait for task completion, transfer, no rush
ready, _ = ray.wait([ref], num_returns=1)
# Get data locally
t0 = time.time()
# I'd like this to be instant
big_array_local = ray.get(ready)
print(f'Took {time.time() - t0} to get()')
In this case, if we offload work to ray.wait, ray.wait will take longer instead right (so the whole latency might be similar)? Why does ray.get have to be instant in your use case?
Hey @sangcho , the reason for the performance requirement is because right now the ray.get is bottlenecking the whole system (there is a very tight feedback loop in this particular use case).
@rliaw Sorry about my ignorance here. But isn’t it going to have the same bottleneck at the end of the day? For example, let’s say you are waiting for 2 objects A and B. The current latency is;
ray.wait([A,B]) # Latency to fetch the object A and B (1)
ray.get([A, B]) # Latency to deserialize the object A and B. (2)
Latency == (1) + (2)
With the feature request;
ray.wait([A, B]) # Latency to fetch the object A and B (1) + deserialize the object A and B (2)
ray.get([A, B])
latency == (1) + (2)
Am I missing something here? Btw the latency of ray.get should be the deserialization cost. Not sure why it could take 2 ms when the zero-copy deserialization is used.
Hmm I am still confused. If the issue is deserialization, doing jobs in ray.wait for ray.get won’t improve anything isn’t it? It just increases the time to return from ray.wait, and instead ray.get will be short (but the whole time will be the same). Am I missing something here.
Yeah I agree that’s the main issue, but the specific feature request (move work for ray.get to ray.wait) won’t improve the situation I think? It is more like there is probably a deserialization bug, not the problem of ray.wait doesn’t do work for ray.get.
Hey @sangcho sorry for the delayed response (I was ooto)
Here is some mock code to add a bit more detail to the data flow
def __next__(self):
# Issue ray actor tasks so that work is always being done on other processes
while len(self.request_refs) < MAX_WORKERS:
self.request_refs.append(do_some_work.remote())
# Wait until at least one batch of work is ready,
# this should be instant once training warms up. Ideally, under the hood ray.wait()
# will launch the necessary processes to convert *any* ready refs into a state that
# they can be instantly accessed with ray.get()
ready, busy = ray.wait(self.request_refs, num_returns=1)
# Should be constant and instant
batch = ray.get(ready[0])
self.request_refs.remove(ready[0])
return batch
The other issue is the variability of the deserialization which can lead to > 2ms ray.get() times. Are you able to repro this on your machines? This is tested on Linux.
Hmm I see. It makes sense now why you’d like this feature. It’d be great if you create enhancement request for this feature in the Github issue + deserialization issue with repro script. I am not aware of any particular issue here.
Yeah, I think this is a bit hard for us to reproduce without any more details, so a full repro script would be really helpful!
For one, we should try to verify that why deserialization is actually a bottleneck. For one, wrapping a class around the numpy array should not impact “ray.get”:
import ray
import numpy as np
ray.init()
class Foo:
def __init__(self):
self.x = np.ones(10**8)
f = Foo()
f_id = ray.put(f)
%time new_f = ray.get(f_id)
Hey @sangcho, for the deserialization issue, the snippet posted in my original post will sometimes take > 2ms in a notebook when I test on an A100 cluster as is.
For a snippet with the the Ray requests inside of a dataloader (to show the benefits of pipelining) I will need to mock this up without our source code which I’ll have to circle back to doing when I get some more time, should I file this as a github ticket with the repro? Thanks!