How severe does this issue affect your experience of using Ray?
Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have a DataGenerator class
@ray.remote
class DataGenerator:
def generate_continuously(self):
while True:
time.sleep(5)
data = random.rand()
# I need data to be put into a queue common to all instances of DataGenerator
From the main script, I instantiate many of them
queue = # Some shared queue
# Create generator handles and start continuous collection for all
handles = [DataGenerator.remote() for i in range(10)]
ray.wait([handle.generate_continuously.remote() for handle in handles])
# Continuously pop the queue and store locally the results
all_data = []
while True:
data = queue.pop_all()
all_data.extend()
# do something compute-intensive with all_data
I need each handle to put their result into a common queue that I can repeatedly access from this main script.
What I’ve tried
This is the closest I got to the desired result:
@ray.remote
class DataGenerator:
def generate(self):
time.sleep(5)
data = random.rand()
return data
N_HANDLES = 10
generator_handles = [DataGenerator.remote() for i in range(N_HANDLES)]
# Map generator handle index to the ref of the object being produced remotely
handle_idx_to_ref = {idx: generator_handles[idx].remote.generate() for idx in range(N_HANDLES)}
all_data = []
while True:
for idx, ref in handle_idx_to_ref.items():
ready_id, not_ready_id = ray.wait([ref], timeout=0)
if ready_id:
all_data.extend(ray.get([ready_id]))
# Start again generation for this worker
handle_idx_to_ref[idx] = generator_handles[idx].remote.generate()
# Do something compute-intensive with all_data
This is almost good, but if the compute-intensive operations takes too long, some DataGenerator could be finished and not started again until the next iteration. How can I improve on this code?
I got a bit closer to my desired solution by creating a remote Buffer object to which the actors continuously send their results. The main thread then can read from the buffer without being stuck waiting for the actors that never finish. However, to me this doesn’t seem very efficient for two reasons:
I need to keep the CPU busy to keep alive one Ray Actor (the buffer) which doesn’t really do much computing
It stil seems an unnecessary communication step, i.e. why doing actor → buffer → master (two serialization-transport-deserialization steps) instead of directly actor → master ?
Any thought of how to improve it?
Current code
@ray.remote
class Buffer:
def __init__(self):
self.buffer = []
def add(self, values: List):
self.buffer.extend(values)
def pop_all(self):
values = self.buffer
self.buffer = []
return values
@ray.remote
class DataGenerator:
def __init__(self, buffer_handle: ActorHandle):
self.buffer_handle = buffer_handle
def generate(self):
while True: # Now this never stops
time.sleep(5)
data = random.rand()
self.buffer_handle.add.remote(data) # Should this be inside a ray.get() call?
buffer_handle = Buffer.remote()
N_HANDLES = 10
generator_handles = [DataGenerator.remote(buffer_handle) for i in range(N_HANDLES)]
while True:
data = ray.get(buffer_handle.pop_all.remote())
# Do something compute-intensive with all_data
I need to keep the CPU busy to keep alive one Ray Actor (the buffer) which doesn’t really do much computing
You can set num_cpus=0 if the Buffer Actor doesn’t do much computation.
It stil seems an unnecessary communication step, i.e. why doing actor → buffer → master (two serialization-transport-deserialization steps) instead of directly actor → master ?
Why don’t you let master pull from DataGenerator actors directly?
I see, thanks!
The reason I can’t pull from data generators directly is that their generate() function contains a while True loop, so the method never stops and the master remains stuck.
The following code, for example, does not work:
import time
import random
import ray
@ray.remote
class DataGenerator:
def __init__(self):
self.data = []
def generate(self):
while True: # Now this never stops
time.sleep(5)
value = random.random()
self.data.append(value)
def pop_data(self):
data = self.data
self.data = []
return data
# Create generators and start data collection
generator_handles = [DataGenerator.remote() for _ in range(2)]
for handle in generator_handles:
handle.generate.remote()
print('Started data collection')
while True:
values = ray.get([handle.pop_data.remote() for handle in generator_handles])
print('Received values:', values)
time.sleep(1)
The master gets stuck in the values = ray.get([handle.pop_data.remote() for handle in generator_handles]) call. Note that this call does not involve the generate() function of DataGenerator (which has a while True loop) but the master is still unable to call the pop_data() method as the DataGenerator actor is busy in the generate() method
The reason I can’t pull from data generators directly is that their generate() function contains a while True loop, so the method never stops and the master remains stuck.