[Autoscaler] Sharded Autoscaler Ray cluster

Hi,
My ray application requires that certain tasks are assigned to a specific group of nodes. For example,
task1 assigned to group1: node1,2,3
task2 assigned to group2: node4,5,6,7,8
task3 assigned to group3: node9
In case of node failure, e.g., node 1, failed, I’m thinking of relying on ray autoscaler to add a new node to the cluster, and then inject this node resource to the group1.

Is there any easy way to achieve this?

Best,
Jialin

2 Likes

Yep you can do this with custom resources

available_node_types
  group1:
    node_config: ...
    resources: {"group1": 1}
  group2:
    node_config: ...
    resources: {"group2": 1}

then in your code, you can ensure the there are 3 nodes in group 1, 4 in node 2, etc by doing

autoscaler.sdk.request_resources(bundles=[{"group1": 3}, {"group2": 4}])

or just set min_workers on those node groups to ensure that you have x of the nodes.

To schedule tasks on the nodes, include a tiny amount to force the task to run on a node from that group

@ray.remote(num_cpus=1, resources={"group1": 0.001})
def foo():
 pass
1 Like

Can node group be created dynamically during runtime?

2 Likes

To be more concrete about the underlying problem here, we would ideally like to:

  1. Launch Ray cluster A to run tasks that collect stats for N table partitions.
  2. Based on these stats, create or modify node groups in Ray Cluster A to assign min/max node group sizes for upcoming per-partition transforms that will be assigned to each node group.
  3. Run a transform for each partition on ray cluster A, where each transform uses nodes from only 1 group.
2 Likes

I see, I’m assuming the nodes in your cluster all homogenous (or at least you don’t care which node group a particular version of a node belongs to). This doesn’t give you a strong guarantee, but you may consider using placement groups as your node groups with a PACK policy, though there are caveats to this (you may want all tasks to be num_cpus=1, etc).

You can also dynamically modify the autoscaler config, but that’s probably not what you want to be doing.

I think what you probably really want is to un-deprecate dyanmic resources. ray/dynamic_resources.py at master · ray-project/ray · GitHub

1 Like

Right - a dynamic-resource-based solution is actually where we started our discussion, and resurrecting it may be the best way forward here.

Placement groups could work, but seem like they make us concede to (1) having a static-sized node group per partition (we’d prefer autoscaling node groups) and (2) losing dynamic specification of the memory requirements of each task at execution time. This is based on the assumptions that (1) placement group creation always triggers on-demand autoscaling, and (2) dynamic task resource requirements cannot be specified together with a placement group (e.g. some_task.options(scheduling_strategy=PlacementGroupSchedulingStrategy(...), resources={...}).remote()).

Specifying custom resources during ray start also seems theoretically possible, but the process to determine the resource to attach to a node being started may be complex, since we’d need to introspect the current state of the cluster first to decide which shard the node should be placed in.

(1) having a static-sized node group per partition (we’d prefer autoscaling node groups)

Agreed. There has been some talk of autoscaling placement groups cc @sangcho

(2) losing dynamic specification of the memory requirements of each task at execution time.

You should still be able to include memory requirements/other resources that you don’t pre-allocate in the placement group.

1 Like

@Alex @sangcho After launching a cluster with multiple node groups, is there any API to query the node group? e.g.,

  1. how many node groups are available
  2. status of a specific node group, e.g., number of nodes
  3. node group config, e.g., min and max
  4. whether node groups has been used/assigned task or not

Given you are using Alex’s implementation;

  1. You can infer it using ray status. or ray.available_resources()
  2. Same as 1.
  3. This is a static config from the YAML file.
  4. Same as 1

Cool, is there any api for ray status?

Also, can I know the node id of each node group?