The short question is: If I want to pass data between Actors on a Ray Queue, is there any way of performing a queue.get without deserializing the data?
For more context, here is a representation of my application:
import ray
from ray.util.queue import Queue
import time
@ray.remote
class ProductionSupervisor:
'''Fake data stream
IRL: Streams complex (badly structured) JSON from a 3rd party API.
Each message contains data for separate subjects'''
def __init__(self, queue):
self._queue = queue
self._cnt = -1
self._msg_txt = 'msg_'
def gen_msgs(self):
while True:
time.sleep(1)
self._cnt += 1
self._queue.put(f'{self._msg_txt}{self._cnt}')
@ray.remote
class ConsumptionSupervisor:
'''Distribute each newly received message
to ALL workers.'''
def __init__(self, queue=None):
self._queue = queue
self._workers = [Worker.remote() for _ in range(3)]
def process_msgs(self):
while True:
# TODO: Can I get this without deserializing it?
msg = self._queue.get()
# Put msg onto the object store
# so that it does not end up being serialized EACH
# time it is passed to the worker function
# TODO: This seems like inefficient additional processing
msg_id =ray.put(msg)
for worker in self._workers:
worker.work.remote(msg_id)
@ray.remote
class Worker:
'''IRL: each worker represents a specific subject; Extracts
the relevant data, from the passed in message dict,
processes it, and stores it here'''
def work(self, msg):
# For demo purposes, just print the whole message
print(msg)
if __name__ == '__main__':
ray.init()
queue = Queue()
producer = ProductionSupervisor.remote(queue)
consumer = ConsumptionSupervisor.remote(queue)
producer.gen_msgs.remote()
consumer.process_msgs.remote()
while True:
pass
As you can see, I am getting data off of the queue and serializing it, again, before passing it to other Ray Actors. This seems very inefficient, and my application needs to be snappy.
Is there a way to keep this data serialized until a final point where it can be explicitly deserialized?
Btw, I am not wedded to the idea of using queues. If there is a better way of doing what my demo code is trying to achieve, I’d be very happy to hear about it!
Thanks
I
How severe does this issue affect your experience of using Ray?
Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I can, but my experimentation with Ray Queue indicates that queue.get will always deserialize the data, and that’s the problem.
Please consider this simple example:
import ray
from ray.util.queue import Queue
ray.init()
queue = Queue()
norm_var = 'Foo'
ser_var = ray.put(norm_var)
# Put the serialized value on the queue
queue.put(ser_var)
got_val = queue.get()
# We get an unserialized value
print(got_val)
Also, I forgot to mention, in my first response, that it’s my understanding that queue.put serializes the argument anyway. In which case, using ray.put, first, will be redundant.
Hmm it seems to work for me. queue.get() returns a list with a single ObjectRef, which acts like a pointer to the serialized value.
import ray
from ray.util.queue import Queue
ray.init()
queue = Queue()
norm_var = 'Foo'
ser_var = ray.put(norm_var)
# Put the serialized value on the queue
queue.put([ser_var])
got_val = queue.get()
print(got_val)
# [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000)]
With thanks to @Stephanie_Wang, I’ve updated my original code to prove end-to-end queue put/gets of serialized messages.
Posted here in case it might help someone else:
import ray
from ray.util.queue import Queue
import time
@ray.remote
class ProductionSupervisor:
'''Fake data stream
IRL: Streams complex (badly structured) JSON from a 3rd party API.
Each message contains data for separate subjects'''
def __init__(self, stream_msg_queue):
self._stream_msg_queue = stream_msg_queue
self._cnt = -1
self._msg_txt = 'msg_'
def gen_msgs(self):
while True:
time.sleep(1)
self._cnt += 1
# Serialize the message
ser_msg = ray.put(f'{self._msg_txt}{self._cnt}')
# Put the serialized message on the stream msg queue
# NOTE: Wrap in brackets, so it will remain serialized
# when a queue.get is performed on it
self._stream_msg_queue.put([ser_msg])
@ray.remote
class ConsumptionSupervisor:
'''Distribute each newly received message
to ALL workers.'''
def __init__(self, _stream_msg_queue):
self._stream_msg_queue = _stream_msg_queue
self._workers = [Worker.remote(id_val) for id_val in range(3)]
def process_msgs(self):
while True:
# Get the message from the queue
ser_msg = self._stream_msg_queue.get()
# Pass the message to all workers
for worker in self._workers:
worker.work.remote(ser_msg)
@ray.remote
class Worker:
'''IRL: each worker represents a specific subject; Extracts
the relevant data, from the passed in message dict,
processes it, and stores it here'''
def __init__(self, worker_id: int) -> None:
self._worker_id = worker_id
def work(self, ser_msg):
# Unserialize the message
wrapped_msg = ray.get(ser_msg)
# Unwrap the message
msg = wrapped_msg[0]
# Output
output_msg = f'From Worker {self._worker_id}: {msg}'
print(output_msg)
if __name__ == '__main__':
ray.init()
stream_msg_queue = Queue()
producer = ProductionSupervisor.remote(stream_msg_queue)
consumer = ConsumptionSupervisor.remote(stream_msg_queue)
producer.gen_msgs.remote()
consumer.process_msgs.remote()
while True:
pass