Hi, I used ray’s queue to send data. To avoid blocking, I used put_async
, however, it returns the following warning:
RuntimeWarning: coroutine 'Queue.put_async' was never awaited
from ray.util.queue import Queue
queue = Queue(maxsize=10)
queue.put_async(10)
Should I use asyncio
to run the async object? Say: asyncio.run(queue.put_async(10))
I solved this problem by using async Actors, the following is my code:
import time
import asyncio
import ray
import numpy as np
from ray.util.queue import Queue
@ray.remote(num_cpus=0.0)
def process_func(x):
time.sleep(0.5)
return x + np.random.rand()
@ray.remote
class AsyncActor:
async def run_concurrent(self, queue):
objs = []
for i in range(100):
st = time.time()
objs.append(queue.put_async(process_func.remote(i)))
print(f'cost: {(time.time()-st):.2f}')
# NOTE: as queue.put_async are corountine objs, please await here
await asyncio.wait(objs) # block here
# NOTE: equivalently or you can await the results in this way
# for obj in objs:
# await obj
return 'Get all data'
def ready(self):
# waiting for the ready of the actor
return True
async def main(actor, queue):
# NOTE: as run_concurrent is corountine obj, please await #
ray_results = await actor.run_concurrent.remote(queue) # block here
print(f"Get: {ray_results}")
ray.init()
q = Queue(maxsize=1000)
actor = AsyncActor.remote()
print(ray.get(actor.ready.remote()))
st_time = time.time()
asyncio.run(main(actor, q))
print(f'async ready: total time cost: {time.time() - st_time}')
data = []
print(f"Getting data")
while True:
r = q.get()
print(f"Getting data!!!!!!!!!!!!!")
if q.size() == 0:
print('done!')
break
else:
print(f'get 1 sample')
data.append(r)
print(f"queue.put_async, len(data): {len(data)}, data: {data}, \ntotal time cost: {time.time() - st_time}s")
1 Like