Thanks for the response @ericl, yes your example worked as you explained.
So it sounds like the object that contains an ObjRef must be deserialized before the created process dies, is that correct? Is this because __dealloc__
will decrement the reference count once and therefore destroy the objRef at process exists?
I’ve been a separate approach where I hold the reference through a detached actor, but I haven’t been successful so far, could you help me understand why this doesn’t work.
In this case, I’m connecting to a ray cluster that has already been initialized.
Driver 1 (jupyter notebook)
ray.init(address='auto', _redis_password='5241590000000000', namespace="mynamespace")
@ray.remote
class Worker:
def __init__(self):
self.ref = None
def generate_oob(self, data):
self.ref = ray.put(ray.cloudpickle.dumps(data[0]))
def get_ref(self):
return ray.cloudpickle.loads(ray.get(self.ref))
w = Worker.options(name="myworker", lifetime="detached").remote()
x = ray.put("Hello")
oob = w.generate_oob.remote([x])
ray.get(ray.get(w.get_ref.remote()))
"hello"
Ray memory looks like this:
--- Object references for node address: 192.168.152.41 ---
IP Address PID Type Call Site Size Reference Type Object Ref
192.168.152.41 13423 Worker disabled ? ACTOR_HANDLE ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000
192.168.152.41 13408 Driver disabled ? ACTOR_HANDLE ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000
192.168.152.41 13408 Driver disabled 15.0 B LOCAL_REFERENCE 63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000001000000
192.168.152.41 13423 Worker (deserialize actor tas 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
k arg) __main__.Worker
.generate_oob
192.168.152.41 13408 Driver disabled 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
192.168.152.41 13423 Worker disabled 225.0 B LOCAL_REFERENCE 63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000
Exit notebook, ray memory looks like this:
--- Object references for node address: 192.168.152.41 ---
IP Address PID Type Call Site Size Reference Type Object Ref
192.168.152.41 13423 Worker disabled ? ACTOR_HANDLE ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000
192.168.152.41 13423 Worker (deserialize actor tas 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
k arg) __main__.Worker
.generate_oob
192.168.152.41 13423 Worker disabled 225.0 B LOCAL_REFERENCE 63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000
run another one from the same namespace:
w = ray.get_actor("myworker")
ray.get(ray.get(actorHdl.get_ref.remote()))
throws OwnerDiedError error:
2021-11-05 13:13:46,996 WARNING worker.py:1619 -- Local object store memory usage:
(global lru) capacity: 13070983987
(global lru) used: 0%
(global lru) num objects: 0
(global lru) num evictions: 0
(global lru) bytes evicted: 0
---------------------------------------------------------------------------
OwnerDiedError Traceback (most recent call last)
/var/folders/9l/q_y0w5yx2_1_h65cprqgczyc0000gq/T/ipykernel_13460/1777489306.py in <module>
1 w = ray.get_actor("myworker")
----> 2 ray.get(ray.get(w.get_ref.remote()))
~/Documents/conda/modin/modin-env/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
87 if func.__name__ != "init" or is_client_mode_enabled_by_default:
88 return getattr(ray, func.__name__)(*args, **kwargs)
---> 89 return func(*args, **kwargs)
90
91 return wrapper
~/Documents/conda/modin/modin-env/lib/python3.9/site-packages/ray/worker.py in get(object_refs, timeout)
1621 raise value.as_instanceof_cause()
1622 else:
-> 1623 raise value
1624
1625 if is_individual_id:
OwnerDiedError: Failed to retrieve object ffffffffffffffffffffffffffffffffffffffff0100000001000000. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.
The object's owner has exited. This is the Python worker that first created the ObjectRef via `.remote()` or `ray.put()`. Check cluster logs (`/tmp/ray/session_latest/logs/*01000000ffffffffffffffffffffffffffffffffffffffffffffffff*` at IP address 192.168.152.41) for more information about the Python worker failure.
Ray memory looks like this:
--- Object references for node address: 192.168.152.41 ---
IP Address PID Type Call Site Size Reference Type Object Ref
192.168.152.41 13423 Worker disabled ? ACTOR_HANDLE ffffffffffffffff4b97ec8a423e44c39713d4fc0100000001000000
192.168.152.41 13423 Worker (deserialize actor tas 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
k arg) __main__.Worker
.generate_oob
192.168.152.41 13460 Driver <unknown> 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
192.168.152.41 13423 Worker disabled 225.0 B LOCAL_REFERENCE 63964fa4841d4a2e4b97ec8a423e44c39713d4fc0100000003000000
who was the original owner of ffffffffffffffffffffffffffffffffffffffff0100000001000000
in this case, was it the first driver and is that why I’m getting the failure after exiting the first process?
192.168.152.41 13408 Driver disabled 20.0 B LOCAL_REFERENCE ffffffffffffffffffffffffffffffffffffffff0100000001000000
any way to get around this, and just to be clear, my aim is to be able to serialized any object that may contains 1 or more objRefs
and deserialized it from a separate process with the assumption that previous process may have died before deserialization could happen.
Do I need to create the data itself (ray.put(“Hello”)) inside the actor for this to work out? It seems so, as the following example in which I call the ray.put()
from within the work does seem to work, so I’m guessing here the actor objects the objRef produced by ray.put()
:
@ray.remote
class Worker:
def __init__(self):
self.ref = None
# cloudpickled and return oid representing the original data
def generate_oob(self, data):
self.ref = ray.put(ray.cloudpickle.dumps(data[0]))
# return oid representing the unpickled data stored by the actor
def get_ref(self):
return ray.cloudpickle.loads(ray.get(self.ref))
def execute(self):
x = ray.put("Hello")
return self.generate_oob([x])
w = Worker.options(name="myworker", lifetime="detached").remote()
oob = w.execute.remote()
after further thought, I do need the serialization anymore if I just use detached actors, this seems to be enough
@ray.remote
class Worker:
def __init__(self):
self.x = None
def execute(self):
self.x = ray.put("hello")
def get_x(self):
return self.x;
w = Worker.options(name="myworker", lifetime="detached").remote()
# oob = w.generate_oob.remote([x])
oob = w.execute.remote()
I can then call ray.get(w.get_x.remote()
from another process after exiting the previous process