How to: ensure actor is running on the same node only?

I would like to have the following situation:

  • a ray cluster running several nodes
  • an application gets started on node A
  • some actors for that application can run on any node in the cluster (no problem here)
  • some actors need to run on the SAME node where the application got started: is there a way to ensure this?

More generally, at the time an actor gets started, my code knows if it should be restricted to the same node as where it gets started or not. How can this be done?

Rationale: some actors do not need external resources or only resources which can get shared between nodes in some way. However there are also actors which need resources only present on the node where they get started.

Would launching the actors into placement group help? Placement Groups — Ray v2.0.0.dev0

Otherwise, you might be able to use (subject to change) API ray.get_runtime_context() to dynamically retrieve the node id and put placement constraint.

TBH, I do not know how placement groups may help: as far as I understand, the code that starts the actors (lets call that “starter”) can use those to make sure that started actors are all on some node that is the same node for all of those started actors. However, there is no control over WHICH node that is, only that it is the same for all of them, or do I miss anything here?

But what is essential in my case is that those actors all run on the same “starter” node. How could that get ensured using placement groups or the runtime context?

The ray.get_runtime_context() method can be used to get the current node id, but I could not find any method to restrict a placement group to a given node.

OK, I think I found out how this may work, but I am not sure if this is really documented somewhere?

There seems to be a “special” resource key for doing this: “node:”. So using resource={"node:198.168.2.2": 0.001} apparently makes sure that a task/actor with that resource restriction will only run on the node with that IP. This resource could probably be used to define a placement group, but also directly when remoting the task/actor.

So in order to run on the same node as the code remoting the task/actor, that code would first have to get its own IP. I think this could be done e.g. by ray.get_runtime_context().worker.node_ip_address.

Can anyone confirm?

From looking at the source code, there is an even easier way to get the node restriction for the current node: ray.state.current_node_id()

Sadly nothing of this is to be found anywhere in the documentaiton. The API documentation seems to be restricted to a subset of modules and classes so even though this is in the PythonDoc of the respective function, it shows up nowhere in the API docs on the website.

ray.get_runtime_context().worker.node_ip_address. is the right way to go (it is documented here API and Package Reference — Ray v2.0.0.dev0). ray.state is a private part of the code, and it is not recommended to be used.

Usually, it is more recommended to use the placement group APIs than using custom resources directly because Ray would like to achieve serverless abstraction (meaning it doesn’t want you to care about specific machine information like node ip).

Usually, when you’d like to schedule a task, there are 2 reasons.

  1. would like to place actors or tasks on the same node to maximize locality.
  2. Actors or tasks need to be scheduled on a node that has specific “resources” (e.g., GPU).

So, the recommended solution in your case is to create some actors that need to be created on the same node with a STRICT_PACK strategy’s placement group.

Thanks - so it seems I do not understood the documentation of placement groups:
I thought whatever restriction is defined in a placement group only applies to the workers that get started with that group. But the code creating that placement group and starting my other worker has itself not been placed in that group! So my understanding was this:
if my “main” code (the one that runs ray.init()) creates some placement group with STRICT_PACK and then starts 5 workers in that group, Ray will make sure that all 5 are on the same node, whichever node that may be, but not necessarily the same node as the code that started the 5 workers? Is this not correct?

If my assumption is not correct, would that mean that the worker creating the placement group is automatically also a memeber of that group? Or would it need to get added first itself (how)?

Sorry, but I am still confused about this.

Regarding using ray.state I think it would be better if that was called ray._state if it is meant to be private and should not get used by clients?

if my “main” code (the one that runs ray.init() ) creates some placement group with STRICT_PACK and then starts 5 workers in that group, Ray will make sure that all 5 are on the same node, whichever node that may be, but not necessarily the same node as the code that started the 5 workers? Is this not correct?

Yes this is the correct!

Regarding using ray.state I think it would be better if that was called ray._state if it is meant to be private and should not get used by clients?

Good point. We are planning to do some more refactoring to make private / public more obvious, but it is not yet implemented (due to other priorities).

if my “main” code (the one that runs ray.init() ) creates some placement group with STRICT_PACK and then starts 5 workers in that group, Ray will make sure that all 5 are on the same node, whichever node that may be, but not necessarily the same node as the code that started the 5 workers? Is this not correct?

Yes this is the correct!

@sangcho I do not see how this would then solve my problem, because my original question was about how to ensure those new actors are running on the same node as the starting code is running on

As you confirmed, the strict pack option only ensures that the new actors are all running on some node that is the same for all of them, but not necessarily the same as the node where the starting code was run.

But I NEED to ensure that the workers are running on the same node as where the get started from, because the code knows that some resource only exists on the node where the starting code is running – so the code knows that for those workers, only single node multiprocessing with the resources from the node where the starting happens is possible.

Again, as you confirmed, STRICT_PACK will not ensure that.

Which means we are back to what I suggested earlier: apparently only the use of {"node:<ip_address_of_starting_node": 0.001} may be a solution for this?

same node as the starting code is running on

Why is this the hard requirement? Usually, the best solution is something like this;

Note that using ip address of the starting node is definitely one of the solutions. (Schedule all actors with the small fraction of node ip address resources)

  1. If you need to schedule actors on a certain node, that is usually because the resources are available on that node. In your case starting node (head node I believe?) should have the resources I assume.
  2. When you start that node, add custom resources. Example; ray/example-full.yaml at 6a044f4f307b2c89ac3b1e6a6556017129a20afb · ray-project/ray · GitHub For example, if what you need is a disk, you can do something like “disk”: 1 to the resources field in the autoscaler.
  3. And create a placement group with “disk”:1. This will ensure that placement group is created with STRICT_PACK strategy on a node that has disk resources.
  4. And schedule actors there.

This is a hard requirement because the code that will start all the workers is creating that resource or only learns about that something is used as a shared resource AFTER the ray process has been started on that node.

So when ray is started on that node, that resource may not yet exist or not be known to the ray process, and there is a whole range of resources which can get created during the execution of the starting code: some can be accessed or shared across nodes (like the URL of REST endpoint), but some cannot, like a local file, or a shared memory pool. This can happen on any node, not just the head node.

I think it is a mistake to assume that processes will only ever share hardware resources or in general resources that are known and fixed in advance.

So apparently, using the IP address of the node where such a resource has been created is an indirect way to do this and I am happy that this will work.

Having said that I think it would be really good to document that somewhere as it was really hard to find out that this is even possible.

If you know which tasks or actors create those resources dynamically, you can always schedule tasks / actors with the placement group, and then you can schedule following tasks in the same placement group right? I think the only thing this cannot satisfy is when you cannot know how many cpu resources you will need ahead of time (which is a legit reason) because the placement group reserves static number of resources ahead of time.

Also, good point about the documentation. I guess this can be something like the “ensuring locality” section. Do you have any recommendation where we should add a doc?

I guess maybe as an additional section in Placement Groups — Ray v2.0.0.dev0 ?
TBH I had difficulty to understanding resources (mentioned in the configuration section) and how “custom” resources are actually used by Ray, and also which resources are not “custom” but in some way pre-defined, having a specific meaning (e.g. GPU, CPU, but also “node:addr”) and then when trying to understand how this all relates to the bundles used with placement groups.

If I understood your earlier suggestions, then if placement groups should be a more abstract way to deal with resources, why not allow that a ray node can update its resources dynamically? So a worker running on some Node N could update the resource configuration to add something like {“myspecialresource-”: 1.0 } and then whenever a placement group includes that resource in its “bundle”, workers submitted to that placement group would automatically get limited to the node that has it at the time of submission?
This would have the advantage over using node:ip that the resource that imposes the “same node as starting code” constraint is directly used instead of what follows technically from this. So even if another worker starts an actor that references that particular resource, it would get run on the correct node.

TBH I had difficulty to understanding resources (mentioned in the configuration section)

It’ll be great if you can point me the doc that is confusing! To be clear, in Ray, every resource is just the KV pairs. It is used purely to schedule tasks. Every resource is technically custom resources, and pre-defined resources are just special strings (CPU, GPU, memory, object_store_memory).

If I understood your earlier suggestions, then if placement groups should be a more abstract way to deal with resources

This was supported before, and we don’t currently support after we re-write our scheduler code (just because we didn’t really see user requests for this feature). If you think this feature makes sense to you, I’d like to suggest you to create a feature request with concrete use case example (e.g., your use case)!