ray.wait(fetch_local=False) in asyncio

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Short Version
Is there any way to use ray.wait with fetch_local=False in asyncio? From what I can tell, awaiting a reference, even as part of asyncio.wait(), will always materialize the references on the current worker.

More detailed scenario:
I’m writing what is essentially an asynchronous actor pool. It takes requests, returns a reference immediately, and then forwards the request to an actor at some unknown time in the future. The pool itself is a Ray actor with an asyncio event loop running. The problem I’m facing is that I won’t know which actor to use for any particular request until all of its inputs are ready. From what I can tell, using “await asyncio.wait(asyncio.wrap_future(inputReference.future()))” will materialize inputReference on the pool actor. This would be unpleasant if inputReference points to something big and expensive.

Possible Solutions

  1. Ideally, I’d just be able to add some sort of “fetch_local=False” flag to the Reference’s future or asyncio.wait or something.
  2. Another solution might be a way to specify required input references to an actor invocation without materializing them (like a dependency list or something).
  3. In a previous version of this, I used a thread pool to wrap ray.wait with concurrent.futures.ThreadPoolExecutor and used loop.run_in_executor to make ray.wait compatible with asyncio. The problem here is that I may have many pending requests and it’s not ideal to have a ton of threads floating around. Ultimately, this design led to some nasty head of line blocking issues. I’ll try to do some more principled profiling here to give concrete numbers but I’m pretty sure I was seeing slowdowns when I added 64+ threads.