Question - how to get owner address from ObjectRef

Is there a Python API that extracts owner address from ObjectRef?

Use scenario: I am trying bucketizing a list of ObjectRef by owner address so ray task can be dispatched to process objects in local plasma store in a worker.

Thank you!

yuanchi

cc @Clark_Zinzow

I think Ray should have locality sensitive scheduling soon, so that this problem will be transparently addressed.

1 Like

This is actually already done in 1.3 & current master. @Clark_Zinzow will write how this works in the whitepaper. Currently, all of object location is stored to the owner (Previously, it used GCS for object directory, but it is moved to the owner), and when Ray schedule tasks, it automatically finds the node that has the biggest plasma object size as dependencies.

I don’t think there’s currently a way to get the owner address from the object ref through Python APIs. Also note that the address of owner != address of objects. For example, in the example below, the owner and object will be in different nodes

# Imagine this code is running on a node A
@ray.remote
def f():
    return large_obj

# Imagine f is scheduled on a node B. In this case, the owner of a is node A, but the object is in node B. 
a = f.remote()
1 Like

Thanks @sangcho @rliaw for replying. Your confirmation saved me time to NOT poking around further on the Python API side.

What I am facing is potentially hundreds of objects scattered across a handful of workers. So instead of launching hundreds of tasks to process the objects one at a time, I’d like to group objects by worker and assign them to one task per worker, then relying locality aware scheduling to do its magic.

Thanks!

@sangcho I am new to Ray. Just to clarify about “owner” and “location” of an object in Plasma store. I assume the “ower” is the node that executes “ray.put()”, and the object will be stored in the local Plasma store of the node that owns it. Other none-owner nodes might have a local copy of the object if they reference it. I wonder who puts the object into the Plasma store in your example.

Thanks,

The worker that submits a task is also an owner of the object (and yes, the worker that uses ray.put is the owner of that object, and the object is in that local node). In this case, the owner and object are in different nodes because the object is created by a worker in a remote node, but the owner is still the worker that submits a task

1 Like

Thanks, @sangcho. Okay, I get that in your scenario, say worker A submits a task to worker B (in another compute node) that uses ray.put() to create an object and the object is stored in the local plasma store of the node that worker B runs. Do you imply that in this case, both worker A and worker B are the owner of the object? Namely, it is possible that an object in the Plasma store can have multiple owners.

Multiple owners are not allowed now (but we might support this for making ref counting more robust soon). In that case, what happens is that worker B’s object created by ray.put is owned by B. Think about this scenario;

@ray.remote
def f():
    return ray.put(big_obj) # call this object as object A

b = f.remote() # call this object as object B. Imagine f is worker B, and the driver is worker A.
print(ray.get(b)) # This is object ref, not the object.

in this case, object A is owned by the worker B. object B is owned by worker A.

Okay, yes, worker A (the driver) is the owner of object B, and it is just a local copy on worker A. Since worker A didn’t execute ray.put(), object B might be garbage collected as needed. [Question: Do you even maintain a reference count on object B?] On the other hand, object A will not be garbage collected if worker B is still alive and the reference count of object A is > 0.

In your example, I thought b is object ref and worker A, which executes ray.get(b), will have a local copy of the real object.

I thought b is object ref and worker A, which executes ray.get(b), will have a local copy of the real object.

This is correct. To get the real object A, you should call ray.get(ray.get(b)).

Question: Do you even maintain a reference count on object B?

Yes. It is garbaged collected and ref counted. In this case, there’s a local ref to the variable b, so this object won’t be GC’ed until that’s gone (which you can do by del b)

@yuanchi2807 , I think these links might be useful for you.

@sangcho , if I correctly understand what you said, Ray schedules a task to a node that has the biggest plasma object size even though an object on which the task will be executed is located on a different node. Is that right?

So Ray task tries to be scheduled on a node that has the biggest objects already cached. So if your task requires object A (1GB), B(1GB), and C (3GB), and if there’s a node N1 (A and B local) and N2 (C local), tasks are preferred to be scheduled on node N2. Note that this is not the hard restriction (so it is still possible to be scheduled on N1)