Queue serialization / deserialization

Hi

The short question is: If I want to pass data between Actors on a Ray Queue, is there any way of performing a queue.get without deserializing the data?

For more context, here is a representation of my application:

import ray
from ray.util.queue import Queue

import time

@ray.remote
class ProductionSupervisor:
    '''Fake data stream
    IRL: Streams complex (badly structured) JSON from a 3rd party API.
    Each message contains data for separate subjects'''

    def __init__(self, queue):

        self._queue = queue

        self._cnt = -1
        self._msg_txt = 'msg_'
    
    def gen_msgs(self):

        while True:
            time.sleep(1)
            
            self._cnt += 1

            self._queue.put(f'{self._msg_txt}{self._cnt}')

@ray.remote
class ConsumptionSupervisor:
    '''Distribute each newly received message
    to ALL workers.'''

    def __init__(self, queue=None):

        self._queue = queue
        self._workers = [Worker.remote() for _ in range(3)]

    def process_msgs(self):
        
        while True:
            # TODO: Can I get this without deserializing it?
            msg = self._queue.get()

            # Put msg onto the object store
            # so that it does not end up being serialized EACH
            # time it is passed to the worker function
            # TODO: This seems like inefficient additional processing
            msg_id =ray.put(msg)

            for worker in self._workers:
                
                worker.work.remote(msg_id)

@ray.remote
class Worker:
    '''IRL: each worker represents a specific subject; Extracts
    the relevant data, from the passed in message dict,
    processes it, and stores it here'''

    def work(self, msg):

        # For demo purposes, just print the whole message
        print(msg)

if __name__ == '__main__':

    ray.init()

    queue = Queue()
    
    producer = ProductionSupervisor.remote(queue)
    consumer = ConsumptionSupervisor.remote(queue)

    producer.gen_msgs.remote()
    consumer.process_msgs.remote()

    while True:
        pass

As you can see, I am getting data off of the queue and serializing it, again, before passing it to other Ray Actors. This seems very inefficient, and my application needs to be snappy.

Is there a way to keep this data serialized until a final point where it can be explicitly deserialized?

Btw, I am not wedded to the idea of using queues. If there is a better way of doing what my demo code is trying to achieve, I’d be very happy to hear about it!

Thanks
I

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Could you use ray.put before putting the object onto the queue? Basically put an ObjectRef onto the queue instead of the raw data.

Hi Stephanie

I can, but my experimentation with Ray Queue indicates that queue.get will always deserialize the data, and that’s the problem.

Please consider this simple example:

import ray
from ray.util.queue import Queue

ray.init()

queue = Queue()

norm_var = 'Foo'
ser_var = ray.put(norm_var)

# Put the serialized value on the queue
queue.put(ser_var)

got_val = queue.get()

# We get an unserialized value
print(got_val)

Ah I see. Try wrapping ser_var in a list.

Hi. That doesn’t seem to be making a difference.

Also, I forgot to mention, in my first response, that it’s my understanding that queue.put serializes the argument anyway. In which case, using ray.put, first, will be redundant.

Not 100% sure of that, but that does seem to be what it suggests here: Ray Core API — Ray 1.11.0

I think the root problem is that queue.get is always deserializing the result.

Hmm it seems to work for me. queue.get() returns a list with a single ObjectRef, which acts like a pointer to the serialized value.

import ray
from ray.util.queue import Queue

ray.init()

queue = Queue()

norm_var = 'Foo'
ser_var = ray.put(norm_var)

# Put the serialized value on the queue
queue.put([ser_var])

got_val = queue.get()

print(got_val)
#  [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000)]
1 Like

Duh! Thank you so much for trying that! I’ve just seen my stupid mistake… It’s been a long, long day!

Definitely looking like a solution. Tomorrow (with a fresh brain), I’ll incorporate the principal throughout my code and report back.

Thanks again! Have a great remainder to your day!

With thanks to @Stephanie_Wang, I’ve updated my original code to prove end-to-end queue put/gets of serialized messages.

Posted here in case it might help someone else:

import ray
from ray.util.queue import Queue

import time

@ray.remote
class ProductionSupervisor:
    '''Fake data stream
    IRL: Streams complex (badly structured) JSON from a 3rd party API.
    Each message contains data for separate subjects'''

    def __init__(self, stream_msg_queue):

        self._stream_msg_queue = stream_msg_queue

        self._cnt = -1
        self._msg_txt = 'msg_'
    
    def gen_msgs(self):

        while True:
            time.sleep(1)
            
            self._cnt += 1

            # Serialize the message
            ser_msg = ray.put(f'{self._msg_txt}{self._cnt}')
            
            # Put the serialized message on the stream msg queue
            # NOTE: Wrap in brackets, so it will remain serialized
            # when a queue.get is performed on it
            self._stream_msg_queue.put([ser_msg])

@ray.remote
class ConsumptionSupervisor:
    '''Distribute each newly received message
    to ALL workers.'''

    def __init__(self, _stream_msg_queue):

        self._stream_msg_queue = _stream_msg_queue
        self._workers = [Worker.remote(id_val) for id_val in range(3)]

    def process_msgs(self):
        
        while True:
            # Get the message from the queue
            ser_msg = self._stream_msg_queue.get()

            # Pass the message to all workers
            for worker in self._workers:
                
                worker.work.remote(ser_msg)

@ray.remote
class Worker:
    '''IRL: each worker represents a specific subject; Extracts
    the relevant data, from the passed in message dict,
    processes it, and stores it here'''

    def __init__(self, worker_id: int) -> None:
        
        self._worker_id = worker_id
    
    def work(self, ser_msg):

        # Unserialize the message
        wrapped_msg = ray.get(ser_msg)

        # Unwrap the message
        msg = wrapped_msg[0]

        # Output
        output_msg = f'From Worker {self._worker_id}: {msg}'
        print(output_msg)

if __name__ == '__main__':

    ray.init()

    stream_msg_queue = Queue()
    
    producer = ProductionSupervisor.remote(stream_msg_queue)
    consumer = ConsumptionSupervisor.remote(stream_msg_queue)

    producer.gen_msgs.remote()
    consumer.process_msgs.remote()

    while True:
        pass
1 Like