ray.wait(fetch_local=False) in asyncio

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Short Version
Is there any way to use ray.wait with fetch_local=False in asyncio? From what I can tell, awaiting a reference, even as part of asyncio.wait(), will always materialize the references on the current worker.

More detailed scenario:
I’m writing what is essentially an asynchronous actor pool. It takes requests, returns a reference immediately, and then forwards the request to an actor at some unknown time in the future. The pool itself is a Ray actor with an asyncio event loop running. The problem I’m facing is that I won’t know which actor to use for any particular request until all of its inputs are ready. From what I can tell, using “await asyncio.wait(asyncio.wrap_future(inputReference.future()))” will materialize inputReference on the pool actor. This would be unpleasant if inputReference points to something big and expensive.

Possible Solutions

  1. Ideally, I’d just be able to add some sort of “fetch_local=False” flag to the Reference’s future or asyncio.wait or something.
  2. Another solution might be a way to specify required input references to an actor invocation without materializing them (like a dependency list or something).
  3. In a previous version of this, I used a thread pool to wrap ray.wait with concurrent.futures.ThreadPoolExecutor and used loop.run_in_executor to make ray.wait compatible with asyncio. The problem here is that I may have many pending requests and it’s not ideal to have a ton of threads floating around. Ultimately, this design led to some nasty head of line blocking issues. I’ll try to do some more principled profiling here to give concrete numbers but I’m pretty sure I was seeing slowdowns when I added 64+ threads.

How about this solution:
when return you return two objects (Tasks — Ray 1.12.1), one is the dummy return, and the other one is the actual result. You get the dummy one. We are sure if the dummy one is ready, the actual one should also be ready.

But yeah, I think support fetch_local is a cleaner way but it also requires more work. You can submit some issues or feature requests about this one if it’s important. Or even better, it’ll be nice if you can help contribute it :slight_smile:

Ya, I use the dummy return trick elsewhere in the project. In this case I can’t do it because the references come from the user of the library. They could come from anywhere. I suppose I could require the user to provide a dummy reference for me to wait on, but that’s a big burden on the user.

I’m happy to look into implementing it. I don’t know a lot about how the asyncio wrapping works. It might be as easy as messing with the future that references get wrapped in. It might be easy…or not lol. Do you have any pointers to where that is handled in Ray or maybe a reference on how to asyncio-ify stuff?

Another solution could be having a thread there, wrapping with futures like 3). but when you wait, you call ray.wait([obj_refs]). Basically, group all the waiting refs.

As to support async with wait, we firstly need an API for this,

Maybe we can add future(fetch_local=True) as default one? (we need an issue for this and get approved since it’s API change).

Then, for the callback side, we should have AsyncWait implemented which is similar to this one:

Basically, now the callback is only set for set_get_async_callback

we need to have something for set_wait_async_callback(fetch_local=False). Something like this.

I made a feature request on github to track this:

1 Like