I’m trying to implement a non-blocking polling mechanism to periodically check whether a handle of a remote call has been completed and fetched locally (without using async mode). My main process acts as a consumer of data and executes a CPU-bound calculation. I can use a callback (that is invoked periodically) to fetch new data from a producer (which is a ray actor).
My current approach (at the consumer end) is always hold a handle to a pending producer.consume.remote() call. On the periodic callback I just poll this handle (once each call) for completeness using ray.wait([my_prefetch_handle], timeout=0, fetch_local=True).
Empirically ray.wait() always eventually returns the handle as a finished one. The question is whether it’s indeed guaranteed and a good practice?
Thanks for the reply. I’ll sharpen the question - after invoking a remote() call and saving the future handle locally, does ray.wait()ing on this handle only responsible for blocking the current program until the response is ready, or it has an additional role for actually fetching the result? That is, for example, if I’m invoking a remote() call (and saving the handle locally) but not ray.wait()ing for it at all, will the result be fetched locally eventually (by the underlying ray-core process) although not actively waiting for it?
So for small inlined return values ( default 100Kib) - they should be available already when the remote call finishes in the background.
For larger objects, ray will put them in plasma store (a distributed key-value store), and only do on-demand fetching when ray.get() is called (or ray.wait() with fetch_local arg)
Thanks. Your answers are appreciated and make sense.
Regarding the later case (non-inlined big return values):
How would the answer change if just after the remote() call I immediately perform a single call to ray.wait([my_just_created_handle], timeout=0, fetch_local=True) (which is expected to return the task handle in the 2nd list of the unfinished tasks, because this task necessarily hasn’t finished yet in such short time). The question is - does such single wait call somehow mark ray’s core background processes to perform the download locally? So that the download of the return value to local node would be eventually performed in the background, by just keeping the reference to this task’s handle and even without further actively wait()ing for it (or blocking in get()). Is this correct? Will the behavior be as detailed?
Thanks for your professional answers. It really helps us to design our workflows.
does such single wait call somehow mark ray’s core background processes to perform the download locally?
This depends. I think if your object is ready to be fetched, it will continue downloading objects to the local node (cc @Chen_Shen for confirmation. not sure if it was fixed) although timeout is 0 I believe. It was the behavior in the past, and I am not sure if this was changed.
If your object is not ready to be fetched, I believe nothing will happen.
Empirically, we always eventually successfully get the result by only pollingray.wait(..., timeout=0, fetch_local=True) periodically (at least for ray 1.8.0). So, in practice, the observed behavior is as we desire. It also settles with @sangcho’s claim, as eventually the object will be ready to be fetched and the first consecutive polling afterwards will mark to download it in the background.
However, unfortunately this property (guarantee of eventual response by polling with timeout=0) is not mentioned in the documentation. Our question is whether we can safely assume it’s indeed guaranteed and designed that way.
Another questionable assumption: Is it guaranteed that the object’s state will be eventually updated (in the background) to be set as “ready to be fetched” by periodically polling with timeout=0?
However, unfortunately this property (guarantee of eventual response by polling with timeout=0) is not mentioned in the documentation. Our question is whether we can safely assume it’s indeed guaranteed and designed that way.
yeah we should update this doc. I will update the doc once the behavior is confirmed. I think it wasn’t clearly stated because the behavior wasn’t clearly defined before.
Another questionable assumption: Is it guaranteed that the object’s state will be eventually updated (in the background) to be set as “ready to be fetched” by periodically polling with timeout=0?
Yes, that’s correct. The only case it is not updated is when the object is not created from the cluster (meaning the task for that object doesn’t run).
So which next steps would you suggest? Should we firstly confirm the behavior code-wise? I can find and point-out the relevant place in the code that answers this question. It would help if you could guide me to a coarsely relevant area inside the code-base to look in. For example, is this where I should look https://github.com/ray-project/ray/tree/master/src/ray/core_worker ?