How severe does this issue affect your experience of using Ray?
- None: Just asking a question out of curiosity
Hi there, sorry I’m new to Ray still and i’m trying to learn. My question is regarding non-blocking retrieval of ObjectRefs from multiple ObjectRefGenerators.
I’ve attempted multiple things with the closest so far being:
generators = [worker.self_play.remote() for worker in sp_workers]
while True:
tasks = [next(gen) for gen in generators]
results = ray.get(tasks)
for r in results:
print(f"Counter: {r[0]} | {r[1]} - {r[2].shape}")
self-play is a bit of a dummy function at the moment. all it does is generate a number of flat tensors and stack them. The problem i’m having with the above is that it blocks to the slowest generators yield.
The scenario is this. Each self-play actor vectorises an internal model to play multiple games at the same time. Because games end at differing points the plan is to yield the complete games when they complete and allow the other games to continue playing (with the games being yielded being reset). thus we should have a never ending series of games which we’re attempting to stream into a replay buffer. The replay buffer can then be used by a trainer to update the models weights. To improve efficiency, we init multiple self-play workers…
Since each actors method yields the result, that means we have N possible ObjectRefGenerators at a time. Ideally when one yields, i would like to immediately pull that result and place it into the buffer.
I’ve looked over the patterns in the docs, including using generators to reduce heap memory usage, as well as checking out asyncio as well. However i’m equally new to asyncio so there is a possibility I’ve read the answer and didn’t realise it.
Any guidance would be greatly appreciated.
Thank you.
Edit: Further experimentation
I’ve been working on this problem and im working through the example given by AsyncIO / Concurrency for Actors — Ray 2.38.0
the new idea is to implement the replaybuffer with the data generation as an async task. and then pull directly from the replay buffer.
However, the above documentation suggests needing to wrap an async call in a function.
# async ray.get
async def async_get():
await actor.run_concurrent.remote()
asyncio.run(async_get())
but when i try to run the whole code in jupyter,
# regular ray.get
ray.get([actor.run_concurrent.remote() for _ in range(4)])
appears to run fine… what am i missing?