RuntimeWarning: coroutine 'Queue.put_async' was never awaited

Hi, I used ray’s queue to send data. To avoid blocking, I used put_async, however, it returns the following warning:

RuntimeWarning: coroutine 'Queue.put_async' was never awaited
from ray.util.queue import Queue
queue = Queue(maxsize=10)

Should I use asyncio to run the async object? Say:

I solved this problem by using async Actors, the following is my code:

import time
import asyncio

import ray
import numpy as np

from ray.util.queue import Queue

def process_func(x):
    return x + np.random.rand()

class AsyncActor:
    async def run_concurrent(self, queue):
        objs = []
        for i in range(100):
            st = time.time()
            print(f'cost: {(time.time()-st):.2f}')
        # NOTE: as queue.put_async are corountine objs, please await here
        await asyncio.wait(objs)  # block here
        # NOTE: equivalently or you can await the results in this way
        # for obj in objs:
        #     await obj
        return 'Get all data'

    def ready(self):
        # waiting for the ready of the actor
        return True

async def main(actor, queue):
    # NOTE: as run_concurrent is corountine obj, please await #
    ray_results = await actor.run_concurrent.remote(queue)  # block here
    print(f"Get: {ray_results}")

q = Queue(maxsize=1000)

actor = AsyncActor.remote()

st_time = time.time(), q))

print(f'async ready: total time cost: {time.time() - st_time}')

data = []
print(f"Getting data")
while True:
    r = q.get()
    print(f"Getting data!!!!!!!!!!!!!")
    if q.size() == 0:
        print(f'get 1 sample')
print(f"queue.put_async, len(data): {len(data)}, data: {data}, \ntotal time cost: {time.time() - st_time}s")
1 Like