Does polling `ray.wait([task_handle], timeout=0)` guarantee to eventually finish?

I’m trying to implement a non-blocking polling mechanism to periodically check whether a handle of a remote call has been completed and fetched locally (without using async mode). My main process acts as a consumer of data and executes a CPU-bound calculation. I can use a callback (that is invoked periodically) to fetch new data from a producer (which is a ray actor).

My current approach (at the consumer end) is always hold a handle to a pending producer.consume.remote() call. On the periodic callback I just poll this handle (once each call) for completeness using ray.wait([my_prefetch_handle], timeout=0, fetch_local=True).

Empirically ray.wait() always eventually returns the handle as a finished one. The question is whether it’s indeed guaranteed and a good practice?

Thanks,
Elad

Hey @eladgm -

Yes, ray.wait() should return those refs that are ready (as well as those not ready) as two lists in the returns.

You could probably make timeout=None if you want to just wait for the first available object to be ready.

https://docs.ray.io/en/latest/ray-core/api/doc/ray.wait.html#ray-wait

Thanks for the reply. I’ll sharpen the question - after invoking a remote() call and saving the future handle locally, does ray.wait()ing on this handle only responsible for blocking the current program until the response is ready, or it has an additional role for actually fetching the result? That is, for example, if I’m invoking a remote() call (and saving the handle locally) but not ray.wait()ing for it at all, will the result be fetched locally eventually (by the underlying ray-core process) although not actively waiting for it?

Hey @eladgm good question.

So for small inlined return values ( default 100Kib) - they should be available already when the remote call finishes in the background.
For larger objects, ray will put them in plasma store (a distributed key-value store), and only do on-demand fetching when ray.get() is called (or ray.wait() with fetch_local arg)

Thanks. Your answers are appreciated and make sense.
Regarding the later case (non-inlined big return values):
How would the answer change if just after the remote() call I immediately perform a single call to ray.wait([my_just_created_handle], timeout=0, fetch_local=True) (which is expected to return the task handle in the 2nd list of the unfinished tasks, because this task necessarily hasn’t finished yet in such short time). The question is - does such single wait call somehow mark ray’s core background processes to perform the download locally? So that the download of the return value to local node would be eventually performed in the background, by just keeping the reference to this task’s handle and even without further actively wait()ing for it (or blocking in get()). Is this correct? Will the behavior be as detailed?
Thanks for your professional answers. It really helps us to design our workflows.

After the ray.wait returns due to the timeout expires, it will not try to “download” in the background.

You would need to call ray.wait or ray.get again if you want fetch the large (non-inlined) return objects.

Thank @eladgm for detail questions and thanks @rickyyx for your detailed responses.

@eladgm does that answer all your nuanced questions? If so, we can close this issue.

Btw, note that the fetch_local is True by default for ray.wait ray.wait — Ray 3.0.0.dev0

does such single wait call somehow mark ray’s core background processes to perform the download locally?

This depends. I think if your object is ready to be fetched, it will continue downloading objects to the local node (cc @Chen_Shen for confirmation. not sure if it was fixed) although timeout is 0 I believe. It was the behavior in the past, and I am not sure if this was changed.

If your object is not ready to be fetched, I believe nothing will happen.

1 Like

Hi. Thanks for the responses.

Empirically, we always eventually successfully get the result by only polling ray.wait(..., timeout=0, fetch_local=True) periodically (at least for ray 1.8.0). So, in practice, the observed behavior is as we desire. It also settles with @sangcho’s claim, as eventually the object will be ready to be fetched and the first consecutive polling afterwards will mark to download it in the background.

However, unfortunately this property (guarantee of eventual response by polling with timeout=0) is not mentioned in the documentation. Our question is whether we can safely assume it’s indeed guaranteed and designed that way.

Another questionable assumption: Is it guaranteed that the object’s state will be eventually updated (in the background) to be set as “ready to be fetched” by periodically polling with timeout=0?

However, unfortunately this property (guarantee of eventual response by polling with timeout=0) is not mentioned in the documentation. Our question is whether we can safely assume it’s indeed guaranteed and designed that way.

yeah we should update this doc. I will update the doc once the behavior is confirmed. I think it wasn’t clearly stated because the behavior wasn’t clearly defined before.

Another questionable assumption: Is it guaranteed that the object’s state will be eventually updated (in the background) to be set as “ready to be fetched” by periodically polling with timeout=0?

Yes, that’s correct. The only case it is not updated is when the object is not created from the cluster (meaning the task for that object doesn’t run).

1 Like

So which next steps would you suggest? Should we firstly confirm the behavior code-wise? I can find and point-out the relevant place in the code that answers this question. It would help if you could guide me to a coarsely relevant area inside the code-base to look in. For example, is this where I should look https://github.com/ray-project/ray/tree/master/src/ray/core_worker ?

Thanks!

code-wise and I can also try writing a test to verify it. Need some time to verify. Give me a couple days for this one!

1 Like