How to assign different custom resources for each worker nodes?

  • High: It blocks me to complete my task.

I am using cluster.yaml file for launching a cluster using ray.

How can I get different custom resources for each new worker node?
I have defined custom resources in worker node as follow: resources={"custom":1}. Now each time when ray launch new worker it should have given resources as follow:

resource={"custom1":1} for worker 1,  
resource={"custom2":1} for worker 2,  
resource={"custom3":1} for worker 3, 

You could configure several worker node types with different custom resources and cap them at a max of 1, i.e. something like:

available_node_types:
    ray.head.default:
        resources: {}
        node_config:
            ...
    ray.worker1.default:
        min_workers: 0
        max_workers: 1
        resources:
            custom1: 1
        node_config:
            ...
    ray.worker2.default:
        min_workers: 0
        max_workers: 1
        resources:
            custom2: 1
        node_config:
            ...
    ray.worker3.default:
        min_workers: 0
        max_workers: 1
        resources:
            custom3: 1
        node_config:
            ...

Not sure if this is what you’re looking for, if you can provide more info on your use case there might be a better approach.

1 Like

Can you please elaborate, how ray will take this configurations while launching new worker?

I have two worker nodes in local network. When I used the available_node_types key, it’s giving following error:
The field available_node_types is not supported for on-premise clusters.

I think this functionality is only available for cloud clusters.

Can you please suggest other techniques we can use to fulfil the requirement?

If your goal here is just to load balance, then setting the scheduling strategy on all of your tasks to "SPREAD" might be what you’re looking for: Scheduling — Ray 1.13.0

Thanks for your response @ckw017.

Here, it’s not just for load balancing. That’s why, for load balancing I have already start another thread. Here, in requirements itself, we need the different worker with different custom resources.

Can you please suggest, how can I implement required structure with ray?

Ah I see, if you’re using local nodes every node has a custom resource of the form “node:ip_address”. You can check this by doing something like:

import ray
from pprint import pprint
ray.init()
pprint(ray.nodes())

You should see something like the following for every node in the cluster:

{'Alive': True,
  'MetricsExportPort': ...,
  'NodeID': ...,
  'NodeManagerAddress': ...,
  'NodeManagerHostname': ...,
  'NodeManagerPort': ...,
  'NodeName': ...,
  'ObjectManagerPort': ...,
  'ObjectStoreSocketName': ...,
  'RayletSocketName': ...,
  'Resources': {'CPU': 8.0,
                'memory': 19528310784.0,
                'node:<some_ip_address>': 1.0,
                'object_store_memory': 9764155392.0},
  'alive': True}

Under 'Resources' there should be a custom resource of the form 'node:ip_address' that should be unique for each worker node.

@ckw017 Yes. That’s the required thing. Thanks for suggesting a way. But there is some correction in question. Actually as per requirement, I need custom resources as follows:

resource={"detector1":1, "tracker1":2, "stream1":1} for worker 1,
resource={"detector2":2, "tracker2":4, "stream2":4} for worker 2,   
resource={"detector3":1, "tracker3":2, "stream3":1} for worker 3,   

Here, I require three different custom resources with different quantity in each worker nodes. Each single unit resource is used for single actor.
Here, multiple DetectActor will go out of memory, so single actor will handle multiple job. And the same for StreamActor.
As second worker has high end CPU so it can handle multiple StreamActor.

So based on this requirement, we need more than one custom resources.

Can you please suggest a way to implement solution for mentioned requirement?

I can think of two ways to go about this:

If you know the of actors that you want on each node ahead of time, you can manually schedule the amount you want onto each node based on the IP with dynamic remote parameters:

# Actors for node 1
DetectActor.options(resources={"node:ip_1": .01}).remote()
TrackerActor.options(resources={"node:ip_1": .01}).remote()
TrackerActor.options(resources={"node:ip_1": .01}).remote()
StreamActor.options(resources={"node:ip_1": .01}).remote()

# Actors for node 2
DetectActor.options(resources={"node:ip_2": .01}).remote()
DetectActor.options(resources={"node:ip_2": .01}).remote()
TrackerActor.options(resources={"node:ip_2": .01}).remote()
TrackerActor.options(resources={"node:ip_2": .01}).remote()
TrackerActor.options(resources={"node:ip_2": .01}).remote()
TrackerActor.options(resources={"node:ip_2": .01}).remote()
StreamActor.options(resources={"node:ip_2": .01}).remote()
StreamActor.options(resources={"node:ip_2": .01}).remote()
StreamActor.options(resources={"node:ip_2": .01}).remote()
StreamActor.options(resources={"node:ip_2": .01}).remote()

# etc...

Otherwise, you can set up the exact custom resource requirements you want by modifying the worker startup commands. Instead of the ray start --address=$RAY_HEAD_IP:6379, you can place a bash script on each worker node that along the lines of:

#!/bin/bash
# Custom ray start command for node 1
ray start --address $RAY_HEAD_IP:6379 '--resources={"detector1": 1, "tracker1": 2, "stream1": 1}'

And then change the worker start command to execute the bash script on each worker node instead (make sure the script is stored in the same location on each node).

1 Like

Yes, I think second solution will work. Thanks again for resolving the query.