Creating objref from manually created object-id

I would like to let multiple actors work collaboratively on collective operations, such as reductions. A difficult part of this is to get object-references that other actors created (or will create) without extra tasks or alike.
A relatively straight-forward solution would be to allow programmers the creation an objref from a user-provided id. This objref could then be used on any actor without additional tasks/communication. Of course only one actor would be allowed to put the actual object related to the objref.

Here’s a very simple example of what I would like to do:

@ray.remote
class reducer:
    "reduction class, each actor holds one block"
     def __init__(self, block):
          self.block = block

    def reduce(op):
        "collaboratively reduce blocks on n actors to a single value"
        # first reduce my block locally
        l_res = self.block.reduce(op, n)
        # now form a tree to iteratively reduce all local values
        n /= 2
        while n:
            if block.id >= n:
                ray.put(local, block.id)
                break
            else:
                # we know the id of the other value and make a objref from it
                other = ray.get(ray.make_objref(n*2))
                l_res = op(local, other)
                n /= 2
        return l_res if block.id == 0 else None

def foo(blocks):
    "use the reducer"
    n = len(blocks)
    acts = [reducer.remote(b) for b in blocks]
    # trigger reduction and assign obj-ids to results
    reduced = [a.reduce.options(obj_id=n+i).remote(sum, n) for i,a in enumerate(acts)]
    x = ray.get(ray.make_objref(n))  # equivalent to x = ray.get(reduced[0])

I know a reduction can be formulated in ray right now in a different way which some might consider simpler and/or better. The above is just for illustration and a traditional ray-remote formulation will not address the issue. You could view this as a request to mimic SPMD-like or message-passing style programming.

Thoughts?

This was possible in the past, but not anymore because it is not compatible to our ownership-based resource management (Ray 1.0 Architecture - Google Docs).

But there was indeed feature requests like this (and I am aware of the benefits of this), and we are actually tracking of it now; https://github.com/ray-project/ray/issues/12635

It’ll be good to hear your opinion about the proposed APIs

Thanks @sangcho for the link, very interesting discussion.

The issue you mention is mostly concerned about independent jobs and persisting data when an actor dies. In my case it’d be totally fine to fail if the owner no longer exists. It is acceptable to assume to ask the user to make sure owners exists when their data is requested.

Probably more interesting (and challenging) is the fact that one actor might get such a objref before the owner has actually put it. Since the get is a blocking operation that’s not changing the get/put semantics, but of course would require dedicated handling of such cases (through placeholders or alike).

Hey @fschlimb, yes actually the possibility of deadlock is exactly why we chose to move to the ownership model. That way, we ensure that every object ref that gets created is tracked by the system. The system will then guarantee that each ref will eventually have a value, which may be an error if there’s a failure. With manually created ObjectIDs, the programmer would have to do this sort of error handling on their own, which we thought would be too complicated and brittle.

By the way, for your particular example, if I’m understanding correctly, the actors could send data to each other, so the main problem is figuring out when the reduce is finished. Is that right? If so, one pattern you can use is async actors, which we actually introduced in part to support previous cases that required a manually created ObjectID. You can either create a separate async actor that the reduce actors signal, or make all of the reduce actors async as well.

Hey @Stephanie_Wang, thanks for your thoughts. I hadn’t considered the async actors before.

I don’t think the main problem is figuring out when something is done. That is something that Ray already accomplishes with ray.get(…). The main problem is that an actor wants to consume an object for which it does not have an ObjRef. Right now, the only way to get a ObjRef is to get it passed as an argument (to a method).

I guess that’s why you are suggesting a push-model instead of a pull-model: the producer calls a remote method on the consumer. Like in the reduction, result[n] would call a method on the actor which produced result[n-1]. Is that what you suggest?

As long the mapping actor<->object(-id) is static that’s probably acceptable. However generally, such an approach requires heavy bookkeeping (and/or additional communication), in particular when related mapping decisions are made dynamically. The good thing is that Ray already has such bookkeeping in its machinery, so it seems natural to add extensions to cover more advanced cases for general (expert) use.

So I guess what I am looking for is a (scalable) feature which allows me to get/await an object which I can assign a unique id but without an objref and without knowing where/how it gets produced.

@fschlimb in your example, it appears you want to be able to perform a reduction step with a custom id so that you can retrieve it without communication (the object would still go through the object store). I’m wondering whether we ought to propose an API that allows creating an alias to the object ref, e.g.
ray.label_object_ref(object_ref, label)
object_ref = ray.get_object_ref(label)

This solution still has the problems mentioned by @Stephanie_Wang (ray.get_object_ref(label) may cause a deadlock) but perhaps it’s acceptable since it’s a different set of APIs: The existing get/put APIs continue to have the properties provided by the ownership model.

1 Like

I don’t think I understand the problem fully. Is it that when the producer creates an object, it doesn’t know yet who the consumer should be? Can you give an example that can’t be written with pushing arguments?

@Stephanie_Wang, I’m not familiar with the pushing arguments. Can you provide a reference or example?

I can provide some thoughts from a performance point-of-view.

With Ray, I think a clean implementation of tree reduce is to coordinate each step from the driver: The reduce actor implements a single interface that takes an op and one or more blocks and computes the reduction of those blocks. The RPC overhead may be an issue for fine-grained tasks (many small blocks in this case).

In @fschlimb’s example, tasks are coarse-grained (regardless of size and number of blocks), but we still copy when we put, and we still need to go through the GCS to transmit objects between different nodes (perhaps the push args you mention help with this). With that said, I’m not sure the RPC overhead would be a significant bottleneck. Is there another reason why doing tree reduce in Ray this way makes sense?

1 Like

If I understand it correctly, @fschlimb wants to create an object_id with some rule (or support naming). The producer will fill that object_id, and the consumer will try to get the value of that id. If it’s not available, it’ll wait until it’s available somewhere in the cluster. The dependency of tasks created will be established by these created object_id which formed a tree in the end.

From what mentioned in the thread, it looks like it’s not allowed since we want the deadlock to be avoided, ex, waiting for an object which is not existing forever (due to some bug maybe).

So from my understanding, in Ray, tree reduction will require constructing the tree from the bottom to up with the async actor for your case. The dependency of tasks is created by arguments passing and the remote function calls for the current version.

1 Like

Thanks for your comments, it seems we understand each other’s thoughts much better know.

@Stephanie_Wang Yes, the problem is that neither the producer always knows how the consumer is, nor the consumer knows who the producer is. Assume we have a set of execution streams which communicate through identifiers only. You could also think of it as separating data from control/computation. Right now, Ray tightly couples data and control/computation which I would like to avoid.

@elibol Your suggestion looks good to me. I think that would cover what I am looking for. I agree, dead-locks are acceptable in this case, also because they only happen in semantically(!) faulty programs.

Notice that the reduction was just a simple example to illustrate the programming style, not an example for optimizing, beautifying or whatever a reduction specifically.

@elibol I mostly agree to your performance considerations. One thing I’d like to add is that a centralized ‘scheduler’ generally creates a bottleneck. So the suggested mechanism will not only save the RPC calls but also allow making scheduling decisions in a distributed and parallel fashion.

Hi @Stephanie_Wang, My gut feeling is that it should always be possible to express anything either way. As mentioned above, the issue is that in my case I have something like a distributed scheduler where the goal is to make scheduling decisions with minimal communication/synchronization as well as with minimal duplication of (meta-)data. the push-model will require me to explicitly keep track of the mapping between objects, tasks and processes/actors in every actor even though part of this is already provided by the ray runtime.

Also see my other reply.