How does `ray.cloudPickle` implement out-of-band pickling?

I’m curious to know more about the implementation details of ray.cloudpickle for accessing objectRefs out-of-band from different client processes.

Ray 1.x Architecture - Google Docs mentions the following in the Reference Count section:

References can also be created “out-of-band” by pickling an ObjectRef with ray.cloudpickle. In this case, a permanent reference will be added to the object’s count to prevent the object from going out of scope. Other methods of out-of-band serialization (e.g., passing the binary string that uniquely identifies an ObjectRef) are not guaranteed to work because they do not contain the owner’s address and the reference is not tracked by the owner.

Specifically, I’d like to know:

  1. What happens between the client and the Ray cluster when we call ray.cloudpickle.dumps(obj) and ray.cloudpickle.loads(obj)?
  2. Where in the ray code is the permanent reference made?
  3. What is the lifetime of the objRef contained in the pickled obj and when would they get destroyed if ever?
  4. Any side effects to keep in mind when using this mechanism?
1 Like

After stepping though the debugger, I have found that the ray.cloudpickle.dump method eventually hits Serialization.py#SerializationContext.object_ref_reducer() for serializing an ObjectRef

def object_ref_reducer(obj):
    self.add_contained_object_ref(obj)
    worker = ray.worker.global_worker
    worker.check_connected()
    obj, owner_address, object_status = (
        worker.core_worker.serialize_and_promote_object_ref(obj))
    return _object_ref_deserializer, \
        (obj.binary(), obj.call_site(), owner_address, object_status)

where obj is an ObjectRef(ffffffffffffffffffffffffffffffffffffffff0100000001000000)

in self.add_contained_object_ref(obj) it does the following on the global_worker

# If this serialization is out-of-band (e.g., from a call to
# cloudpickle directly or captured in a remote function/actor),
# then pin the object for the lifetime of this worker by adding
# a local reference that won't ever be removed.
ray.worker.global_worker.core_worker.add_object_ref_reference(object_ref)

add_object_ref_reference itself is then defined in _raylet.pyx and calls AddLocalReference

 def add_object_ref_reference(self, ObjectRef object_ref):
        # Note: faster to not release GIL for short-running op.
        CCoreWorkerProcess.GetCoreWorker().AddLocalReference(
            object_ref.native())

which eventually increments the reference_counter_ held by this worker in core_worker.h

  /// by the language frontend when a new reference is created.
  ///
  /// \param[in] object_id The object ID to increase the reference count for.
  /// \param[in] call_site The call site from the language frontend.
  void AddLocalReference(const ObjectID &object_id, std::string call_site) {
    reference_counter_->AddLocalReference(object_id, call_site);
  }

It looks like this is managed by the global_worker. This object seems to be created in worker.py

global_worker = Worker()
"""Worker: The global Worker object for this worker process.

We use a global Worker object to ensure that there is a single worker object
per worker process.
"""

likely being the worker associated with the python process that called ray.init()

I’m wondering next though if this process dies, how is it that the ObjectRef remains accessible from another process?

It does seems like there is a procedure to decrement the reference count on the objectRef through remove_object_ref_reference() at ObjectRef.__dealloc__ call

    def __dealloc__(self):
        if self.in_core_worker:
            try:
                worker = ray.worker.global_worker
                worker.core_worker.remove_object_ref_reference(self)
            except Exception as e:
                # There is a strange error in rllib that causes the above to
                # fail. Somehow the global 'ray' variable corresponding to the
                # imported package is None when this gets called. Unfortunately
                # this is hard to debug because __dealloc__ is called during
                # garbage collection so we can't get a good stack trace. In any
                # case, there's not much we can do besides ignore it
                # (re-importing ray won't help).
                pass

I’m not exactly sure when this get called though in relation to when the python process exits

1 Like

Great tracethrough!

I believe it’s only accessible if you’ve already deserialized that reference (thereby incrementing the refcount for that object). For example, in the code below, if you comment in the ray.kill() of the actor process, then the following deserialization/get for the object will fail (hang):

import ray
import time

@ray.remote
class Worker:
    def produce_out_of_band(self, ref):
        return ray.cloudpickle.dumps(ref[0])

x = ray.put("Hello")
a = Worker.remote()
oob = ray.get(a.produce_out_of_band.remote([x]))
del x  # Now the out of band reference is the only ref holder

# If you add the kill below then we can't get the object back.
#ray.kill(a)
#time.sleep(1)

print(ray.get(ray.cloudpickle.loads(oob)))
2 Likes

Thanks for the response @ericl, yes your example worked as you explained.

So it sounds like the object that contains an ObjRef must be deserialized before the created process dies, is that correct? Is this because __dealloc__ will decrement the reference count once and therefore destroy the objRef at process exists?

I’ve been a separate approach where I hold the reference through a detached actor, but I haven’t been successful so far, could you help me understand why this doesn’t work.

In this case, I’m connecting to a ray cluster that has already been initialized.

Driver 1 (jupyter notebook)

ray.init(address='auto', _redis_password='5241590000000000', namespace="mynamespace")

@ray.remote
class Worker:
    def __init__(self):
        self.ref = None        
        
    def generate_oob(self, data):
        self.ref = ray.put(ray.cloudpickle.dumps(data[0]))
    
    def get_ref(self):
        return ray.cloudpickle.loads(ray.get(self.ref))

w = Worker.options(name="myworker", lifetime="detached").remote()

x = ray.put("Hello")

oob = w.generate_oob.remote([x])

ray.get(ray.get(w.get_ref.remote()))
"hello"

Ray memory looks like this:

--- Object references for node address: 192.168.152.41 ---
IP Address       PID    Type    Call Site               Size    Reference Type      Object Ref                                              
192.168.152.41   13423  Worker  disabled                ?       ACTOR_HANDLE        ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000

192.168.152.41   13408  Driver  disabled                ?       ACTOR_HANDLE        ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000

192.168.152.41   13408  Driver  disabled                15.0 B  LOCAL_REFERENCE     63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000001000000

192.168.152.41   13423  Worker  (deserialize actor tas  20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000
                                k arg) __main__.Worker                                                                                      
                                .generate_oob                                                                                               

192.168.152.41   13408  Driver  disabled                20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000

192.168.152.41   13423  Worker  disabled                225.0 B  LOCAL_REFERENCE     63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000

Exit notebook, ray memory looks like this:

--- Object references for node address: 192.168.152.41 ---
IP Address       PID    Type    Call Site               Size    Reference Type      Object Ref                                              
192.168.152.41   13423  Worker  disabled                ?       ACTOR_HANDLE        ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000

192.168.152.41   13423  Worker  (deserialize actor tas  20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000
                                k arg) __main__.Worker                                                                                      
                                .generate_oob                                                                                               

192.168.152.41   13423  Worker  disabled                225.0 B  LOCAL_REFERENCE     63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000

run another one from the same namespace:

w = ray.get_actor("myworker")
ray.get(ray.get(actorHdl.get_ref.remote()))

throws OwnerDiedError error:

2021-11-05 13:13:46,996	WARNING worker.py:1619 -- Local object store memory usage:

(global lru) capacity: 13070983987
(global lru) used: 0%
(global lru) num objects: 0
(global lru) num evictions: 0
(global lru) bytes evicted: 0

---------------------------------------------------------------------------
OwnerDiedError                            Traceback (most recent call last)
/var/folders/9l/q_y0w5yx2_1_h65cprqgczyc0000gq/T/ipykernel_13460/1777489306.py in <module>
      1 w = ray.get_actor("myworker")
----> 2 ray.get(ray.get(w.get_ref.remote()))

~/Documents/conda/modin/modin-env/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     87             if func.__name__ != "init" or is_client_mode_enabled_by_default:
     88                 return getattr(ray, func.__name__)(*args, **kwargs)
---> 89         return func(*args, **kwargs)
     90 
     91     return wrapper

~/Documents/conda/modin/modin-env/lib/python3.9/site-packages/ray/worker.py in get(object_refs, timeout)
   1621                     raise value.as_instanceof_cause()
   1622                 else:
-> 1623                     raise value
   1624 
   1625         if is_individual_id:

OwnerDiedError: Failed to retrieve object ffffffffffffffffffffffffffffffffffffffff0100000001000000. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.

The object's owner has exited. This is the Python worker that first created the ObjectRef via `.remote()` or `ray.put()`. Check cluster logs (`/tmp/ray/session_latest/logs/*01000000ffffffffffffffffffffffffffffffffffffffffffffffff*` at IP address 192.168.152.41) for more information about the Python worker failure.

Ray memory looks like this:

--- Object references for node address: 192.168.152.41 ---
IP Address       PID    Type    Call Site               Size    Reference Type      Object Ref                                              
192.168.152.41   13423  Worker  disabled                ?       ACTOR_HANDLE        ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000

192.168.152.41   13423  Worker  (deserialize actor tas  20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000
                                k arg) __main__.Worker                                                                                      
                                .generate_oob                                                                                               

192.168.152.41   13460  Driver  <unknown>               20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000

192.168.152.41   13423  Worker  disabled                225.0 B  LOCAL_REFERENCE     63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000

who was the original owner of ffffffffffffffffffffffffffffffffffffffff0100000001000000 in this case, was it the first driver and is that why I’m getting the failure after exiting the first process?

192.168.152.41   13408  Driver  disabled                20.0 B  LOCAL_REFERENCE     ffffffffffffffffffffffffffffffffffffffff0100000001000000

any way to get around this, and just to be clear, my aim is to be able to serialized any object that may contains 1 or more objRefs and deserialized it from a separate process with the assumption that previous process may have died before deserialization could happen.

Do I need to create the data itself (ray.put(“Hello”)) inside the actor for this to work out? It seems so, as the following example in which I call the ray.put() from within the work does seem to work, so I’m guessing here the actor objects the objRef produced by ray.put():

@ray.remote
class Worker:
    def __init__(self):
        self.ref = None        
        
    # cloudpickled and return oid representing the original data
    def generate_oob(self, data):
        self.ref = ray.put(ray.cloudpickle.dumps(data[0]))
    
    # return oid representing the unpickled data stored by the actor
    def get_ref(self):
        return ray.cloudpickle.loads(ray.get(self.ref))
    
    def execute(self):
        x = ray.put("Hello")
        return self.generate_oob([x])


w = Worker.options(name="myworker", lifetime="detached").remote()

oob = w.execute.remote()

after further thought, I do need the serialization anymore if I just use detached actors, this seems to be enough

@ray.remote
class Worker:
    def __init__(self):
        self.x = None
    
    def execute(self):
        self.x = ray.put("hello")

    def get_x(self):
        return self.x;

w = Worker.options(name="myworker", lifetime="detached").remote()

# oob = w.generate_oob.remote([x])
oob = w.execute.remote()

I can then call ray.get(w.get_x.remote() from another process after exiting the previous process