Serialisation Error

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am getting serialisation error as below while putting data into queue.

Traceback (most recent call last):
(cameraActor pid=6173)   File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
(cameraActor pid=6173)     self.run()
(cameraActor pid=6173)   File "/usr/lib/python3.8/threading.py", line 870, in run
(cameraActor pid=6173)     self._target(*self._args, **self._kwargs)
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/util/tracing/tracing_helper.py", line 464, in _resume_span
(cameraActor pid=6173)     return method(self, *_args, **_kwargs)
(cameraActor pid=6173)   File "radp_ray_3_1_issue.py", line 35, in run_source_pipeline
(cameraActor pid=6173)     self.queue.put(sample)
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/util/queue.py", line 105, in put
(cameraActor pid=6173)     ray.get(self.actor.put.remote(item, timeout))
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 144, in remote
(cameraActor pid=6173)     return self._remote(args, kwargs)
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/util/tracing/tracing_helper.py", line 423, in _start_span
(cameraActor pid=6173)     return method(self, args, kwargs, *_args, **_kwargs)
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 190, in _remote
(cameraActor pid=6173)     return invocation(args, kwargs)
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 177, in invocation
(cameraActor pid=6173)     return actor._actor_method_call(
(cameraActor pid=6173)   File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 1175, in _actor_method_call
(cameraActor pid=6173)     object_refs = worker.core_worker.submit_actor_task(
(cameraActor pid=6173)   File "python/ray/_raylet.pyx", line 3333, in ray._raylet.CoreWorker.submit_actor_task
(cameraActor pid=6173)   File "python/ray/_raylet.pyx", line 3338, in ray._raylet.CoreWorker.submit_actor_task
(cameraActor pid=6173)   File "python/ray/_raylet.pyx", line 649, in ray._raylet.prepare_args_and_increment_put_refs
(cameraActor pid=6173)   File "python/ray/_raylet.pyx", line 640, in ray._raylet.prepare_args_and_increment_put_refs
(cameraActor pid=6173)   File "python/ray/_raylet.pyx", line 696, in ray._raylet.prepare_args_internal
(cameraActor pid=6173) TypeError: Could not serialize the argument <Gst.Sample object at 0x7f69a343d280 (GstSample at 0x3bc7220)> for a task or actor ray.util.queue._QueueActor.put:
(cameraActor pid=6173) ================================================================================
(cameraActor pid=6173) Checking Serializability of <Gst.Sample object at 0x7f69a343d280 (GstSample at 0x3bc7220)>
(cameraActor pid=6173) ================================================================================
(cameraActor pid=6173) !!! FAIL serialization: cannot pickle 'Sample' object
(cameraActor pid=6173)     Serializing 'get_buffer' gi.FunctionInfo(get_buffer)...
(cameraActor pid=6173)     !!! FAIL serialization: cannot pickle 'gi.FunctionInfo' object
(cameraActor pid=6173)     WARNING: Did not find non-serializable object in gi.FunctionInfo(get_buffer). This may be an oversight.
(cameraActor pid=6173) ================================================================================
(cameraActor pid=6173) Variable: 
(cameraActor pid=6173) 
(cameraActor pid=6173) 	FailTuple(get_buffer [obj=gi.FunctionInfo(get_buffer), parent=<Gst.Sample object at 0x7f69a343d280 (GstSample at 0x3bc7220)>])
(cameraActor pid=6173) 
(cameraActor pid=6173) was found to be non-serializable. There may be multiple other undetected variables that were non-serializable. 
(cameraActor pid=6173) Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class. 
(cameraActor pid=6173) ================================================================================
(cameraActor pid=6173) Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
(cameraActor pid=6173) If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
(cameraActor pid=6173) ================================================================================

Find the code below to regenerate the error:

import gi
gi.require_version('Gst', '1.0')
import ray
from ray.util.queue import Queue as rQueue
import threading

ray.init("auto")

@ray.remote(num_cpus=0.2)
class cameraActor:
    def __init__(self):
        from gi.repository import Gst, GLib
        Gst.init(None)
        self.cam_url = "rtsp://admin:Welcome234@172.16.120.154:554/"
        self.queue = rQueue(actor_options={"num_cpus":0.05, "namespace":"cargo", "name":"111_ph"})
        self.loop = GLib.MainLoop()

    def run_source_pipeline(self):
        from gi.repository import Gst

        # Create the RTSP source pipeline
        source_pipeline_str = (
            f"rtspsrc location={self.cam_url} latency=200 protocols=4 name=source ! "
            "rtph264depay name=depay ! "
            "h264parse name=parse ! "
            "appsink name=packet_sink"
        )
        self.source_pipeline = Gst.parse_launch(source_pipeline_str)
        packet_sink = self.source_pipeline.get_by_name('packet_sink')
        self.source_pipeline.set_state(Gst.State.PLAYING)

        while True:
            sample = packet_sink.emit('pull-sample')
            # inspect_serializability(sample)
            self.queue.put(sample)
        
    def start_job(self):
        # Create and start threads for each pipeline
        source_pipeline_thread = threading.Thread(target=self.run_source_pipeline)
        source_pipeline_thread.start()
        
        self.loop.run()

def main():
    cam_actor = cameraActor.options(name="111_c", namespace="cargo").remote()
    ray.get(cam_actor.start_job.remote())

if __name__ == "__main__":
    main()

You can use following dockerfile to build working environment.

FROM docker.io/ubuntu:20.04

ENV TZ="Asia/Kolkata"
RUN apt-get update && apt-get -y install tzdata

RUN apt-get install -y --no-install-recommends python3.8 python3-pip python3-wheel python3.8-venv

RUN true \
  && export DEBIAN_FRONTEND=noninteractive \
  && apt-get install -y --no-install-recommends \
    python3-gi \
    python3-gi-cairo \
    python3-dbus \
    gir1.2-gtk-3.0 \
    gir1.2-gstreamer-1.0 \
    gir1.2-gst-plugins-base-1.0 \
    gstreamer1.0-plugins-base \
    gstreamer1.0-plugins-good \
    gstreamer1.0-plugins-ugly \
    gstreamer1.0-pulseaudio \
    gstreamer1.0-plugins-bad \
  && rm -rf /var/lib/apt/lists/* 

RUN python3 -m pip install -U pip PyGObject==3.36.0 ray==2.6.0

ENTRYPOINT ["/bin/bash"]
            sample = packet_sink.emit('pull-sample')

Looks like this sample is not serializable but you are sending this to Ray’s queue actor.

You have to either make this class serializable or only put the data that’s serializable (in this case, gi.FunctionInfo inside the sample instance is not serializable)

Hi @sangcho, Thanks for your response.

Here, the sample instance I am getting from gstreamer and I don’t have any control over the class, so I can’t make that class serializable. And, for the data also, I can’t just take data out of class and put it into queue because when I tried that, it’s braking some gstreamer functionality.

Can you please suggest some other ways, with that I can put sample instance directly into queue without serialisation?

serialization is fundamental when you have cross process/node communication. It is not avoidable.

What’s the reason why you put it in the queue? Normally, it is for the downstream consumer to access some result. Is it possible for you to extract values your downstream consumer need from the sample class and report only those values?

I can extract the data and recreate sample again in consumer side. But the issue here is, in sample object, at multiple level non-serialisable data is there, which is making it complex to extract data and recreate instance again. So, I can’t choose that option.

Hmm I don’t think there’s an other option here. You have to serialize the data in order to send one data to other process via wire (it is not just Ray specific thing). It is a fundamental requirement. (e.g., if you put the sample to multiprocessing.queue, it should raise the exception too).

Is there any way to convert the “sample” into some sort of bytes and recover on the consumer side?

@sangcho Thanks for all your support. Here, in gstreamer, there is not such method to get bytes of an instance and recreate object again with that bytes data.

I think this means the class is basically not suitable for multi processing program. I recommend you to find other way to parallelize the workloads without passing samples around