Share a list/queue across multiple actors

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I have a DataGenerator class

@ray.remote
class DataGenerator:

    def generate_continuously(self):
        while True:
            time.sleep(5)
            data = random.rand()
            # I need data to be put into a queue common to all instances of DataGenerator

From the main script, I instantiate many of them

queue = # Some shared queue 
# Create generator handles and start continuous collection for all
handles = [DataGenerator.remote() for i in range(10)]
ray.wait([handle.generate_continuously.remote() for handle in handles])

# Continuously pop the queue and store locally the results 
all_data = []
while True:
    data = queue.pop_all()
    all_data.extend()
    # do something compute-intensive with all_data

I need each handle to put their result into a common queue that I can repeatedly access from this main script.

What I’ve tried
This is the closest I got to the desired result:

@ray.remote
class DataGenerator:

    def generate(self):
        time.sleep(5)
        data = random.rand()
        return data

N_HANDLES = 10
generator_handles = [DataGenerator.remote() for i in range(N_HANDLES)]
# Map generator handle index to the ref of the object being produced remotely 
handle_idx_to_ref = {idx: generator_handles[idx].remote.generate() for idx in range(N_HANDLES)}
all_data = []
while True:
    for idx, ref in handle_idx_to_ref.items():
        ready_id, not_ready_id = ray.wait([ref], timeout=0)
        if ready_id:
            all_data.extend(ray.get([ready_id]))
            # Start again generation for this worker
            handle_idx_to_ref[idx] = generator_handles[idx].remote.generate() 
    
    # Do something compute-intensive with all_data

This is almost good, but if the compute-intensive operations takes too long, some DataGenerator could be finished and not started again until the next iteration. How can I improve on this code?

UPDATE:

I got a bit closer to my desired solution by creating a remote Buffer object to which the actors continuously send their results. The main thread then can read from the buffer without being stuck waiting for the actors that never finish. However, to me this doesn’t seem very efficient for two reasons:

  1. I need to keep the CPU busy to keep alive one Ray Actor (the buffer) which doesn’t really do much computing
  2. It stil seems an unnecessary communication step, i.e. why doing actor → buffer → master (two serialization-transport-deserialization steps) instead of directly actor → master ?

Any thought of how to improve it?

Current code

@ray.remote
class Buffer:

    def __init__(self):
        self.buffer = []

    def add(self, values: List):
        self.buffer.extend(values)

    def pop_all(self):
        values = self.buffer
        self.buffer = []
        return values

@ray.remote
class DataGenerator:

    def __init__(self, buffer_handle: ActorHandle):
        self.buffer_handle = buffer_handle

    def generate(self):
        while True:  # Now this never stops
            time.sleep(5)
            data = random.rand()
            self.buffer_handle.add.remote(data)  # Should this be inside a ray.get() call?

buffer_handle = Buffer.remote()
N_HANDLES = 10
generator_handles = [DataGenerator.remote(buffer_handle) for i in range(N_HANDLES)]

while True:
    data = ray.get(buffer_handle.pop_all.remote())
    # Do something compute-intensive with all_data

Hi @fedetask

Sorry for the late reply.

I need to keep the CPU busy to keep alive one Ray Actor (the buffer) which doesn’t really do much computing

You can set num_cpus=0 if the Buffer Actor doesn’t do much computation.

It stil seems an unnecessary communication step, i.e. why doing actor → buffer → master (two serialization-transport-deserialization steps) instead of directly actor → master ?

Why don’t you let master pull from DataGenerator actors directly?

I see, thanks!
The reason I can’t pull from data generators directly is that their generate() function contains a while True loop, so the method never stops and the master remains stuck.

The following code, for example, does not work:

import time
import random

import ray


@ray.remote
class DataGenerator:

    def __init__(self):
        self.data = []

    def generate(self):
        while True:  # Now this never stops
            time.sleep(5)
            value = random.random()
            self.data.append(value)

    def pop_data(self):
        data = self.data
        self.data = []
        return data


# Create generators and start data collection
generator_handles = [DataGenerator.remote() for _ in range(2)]
for handle in generator_handles:
    handle.generate.remote()
print('Started data collection')

while True:
    values = ray.get([handle.pop_data.remote() for handle in generator_handles])
    print('Received values:', values)
    time.sleep(1)

The master gets stuck in the values = ray.get([handle.pop_data.remote() for handle in generator_handles]) call. Note that this call does not involve the generate() function of DataGenerator (which has a while True loop) but the master is still unable to call the pop_data() method as the DataGenerator actor is busy in the generate() method

The reason I can’t pull from data generators directly is that their generate() function contains a while True loop, so the method never stops and the master remains stuck.

You can use threaded actor (or async actor): AsyncIO / Concurrency for Actors — Ray 2.9.3

1 Like

Thanks!! That is exactly what I needed. I did not know about this functionality