In raydp, we need to get objects which are put in another session. For example, in to_spark, spark executors (actors in a java ray session, connected using Ray.init) put data into ray object sore, and python ray session will call deserialize_and_register_object_ref on these object refs and get them. Another case is object is put in python ray session, but need to be get in another python ray session(spark python udf worker).
We have been using deserialize_and_register_object_ref , and cloudpickle(dump in one session and load in another), but they are both problematic. They might hang or meet error in some cases, like object_manager.cc:260: Couldn't send pull request.
How to safely share objects among ray sessions? Or why is it not supported? If not supported, does using cross language call to eliminate the java ray session help?
You can share objects between sessions using a Ray named actor. For example, get or create an actor via Actor.options(name="foo").remote() and retrieve it via ray.get_actor(name="foo"). Objects can be registered with this actor via actor method call, and also returned via actor method calls from different sessions.
You can also create objects owned by this actor via ray.put(data, _owner=actor_handle) (note that the _owner argument is an experimental API). This means the object will fate-share with the actor instead of its creator.
Note that deserialize_and_register etc. are private APIs, and should never be used by Ray application code.
Thank you, @ericl , that will help in the udf case. But if I have a named actor created in java, can I get it in a python session? Can I call its method? Or is it possible with cross language call?
I just tried, but it did not work. Here is what I do:
executors(java actors) write data to ray object store, and then store the object ref in an designated object holder(also java actor, and is assigned ownership when executors put the data)
in python, I can get the object holder actor handle, and I can also call its method. It works so far. But I can’t ray.get the object ref(the actual data) which I get from the object holder, it just hangs.
Some clarification: When the executors store object ref in the object holder, I actually store its objectId as byte array. And in python, I can get this byte array and create an Object Ref, but I can’t ray.get this ref. I can’t return object ref directly from the object holder to python because serializer does not support.
Also cc @jovany-wang
Thanks
Maybe a minimal demo could help us to figure the issue out. Let me make sure what you did:
Create a java named actor in session1.
You put data via ray.put() in that java actor.
In session2(a python session), get the java actor handle, and invoke its remote methods to get the bytes of the objectref, which is put by java actor in session1.
Construct a new objectref(Let me name it x) in session2, and then hang at ray.get(x)
If so, I guess you shouldn’t serialize the objectref(or object id) to bytes and then deserialize in another session.
Yep, you should never directly serialize an objectref, except through ray.put() or task calls. It should be possible to pass the objectref directly to/from different tasks as a top-level argument or in a list.
What does top-level argument mean? I guess it’s impossible to pass in a list, because java does not support generic arrays.
I wrote a small demo here:
// DummyExecutor.java
import java.util.List;
import java.util.ArrayList;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.ActorHandle;
public class DummyExecutor {
public String writeData() {
// get holder
ActorHandle<DummyHolder> owner =
(ActorHandle<DummyHolder>) Ray.getGlobalActor("DUMMY_HOLDER").get();
// write data
List ref = new ArrayList<ObjectRef<Integer>>();
ref.add(Ray.put(1, owner));
ref.add(Ray.put(2, owner));
String uuid = "fake_uuid";
// add object ref to holder
owner.task(DummyHolder::addObjectRef, uuid, ref).remote();
return uuid;
}
}
// DummyHolder.java
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.ActorHandle;
public class DummyHolder {
private HashMap<String, List<ObjectRef<Integer>>> objectIds = new HashMap();
public int warmup() {
return 0;
}
public int addObjectRef(String id, List<ObjectRef<Integer>> objects) {
this.objectIds.put(id, objects);
return 0;
}
public List<ObjectRef<Integer>> getObjects(String id) {
List<ObjectRef<Integer>> objects = this.objectIds.get(id);
return objects;
}
}
// DummyMaster.java
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.ActorHandle;
public class DummyMaster {
public ActorHandle<DummyHolder> startHolder() {
return Ray.actor(DummyHolder::new)
.setGlobalName("DUMMY_HOLDER")
.remote();
}
}
# driver.py
import ray
if __name__ == '__main__':
job_config = ray.job_config.JobConfig(code_search_path=["path to classes"])
ray.init(job_config=job_config)
master_class = ray.java_actor_class(
"DummyMaster")
executor_class = ray.java_actor_class(
"DummyExecutor")
master = master_class.remote()
executor = executor_class.remote()
holder = ray.get(master.startHolder.remote())
ray.get(holder.warmup.remote())
id = ray.get(executor.writeData.remote())
print(ray.get(holder.getObjects.remote(id)))
ray.shutdown()
It is slightly different in our application, since executors are created by master via requests from spark. But this demo is pretty close to what I want to do.
What I want to get from the holder is object refs, not data. I could return data, and that would work fine since data is byte array, but then all the data would be pulled to the holder. Plus data will be get once in the holder, and then get again in python. How to achieve this?
That’s right, List<ObjectRef<Integer>> is not supported, neither does ObjectRef<Integer>[]. The demo I provided fails because List<ObjectRef<Integer>> will have b'java' as metadata, and python cannot deserialize it. If I return the binary format of ObjectId, then I can somehow get the ObjectRef x, but ray.get(x) will hang.