Ray Cluster seem to be spawning less nodes than it should

I have this Azure cluster that uses Spot Machines as workers.
I am trying to create some actors that “hot start” nodes before doing any computation.

What is weird is that it doesn’t seems spawn enough worker Pods.

Each to be spawned node has 8 vCPUS and 32Gi - Azure Standard_D8ds_v5

If you refer to the python file, the variable spawn controls the number of actors.

If spawn=1, it spawns 1 pod as it should. For some reason though, if I set to start 14 workers (hence, 14 machines) it never spawn enough pods, it always has 2 fewer than it should. The log says “Adding 13 nodes”, even though it should request 14, and it actually spawned only 12. (tried with spawn=8, spawned 6 pods as well)

I’ve double checked num-cpus on Head node is set to 0, so, it shouldn’t have been considered for scheduling, and it is never assigned anything, so seem fine.

I’ve also checked if the missing pods were in the “pending state” and k8s wasn’t delivering the machines, but it is not the case at all.

I was wondering if it might be confusing with some other nodes on the same cluster (the one that runs the operator), but since they’re in a different namespace and machine, shouldn’t matter and I am really confused.

Job Log:

Tailing logs until the job exits (disable with --no-wait):
2024-05-24 08:33:35,067 INFO worker.py:1429 -- Using address 10.224.0.204:6379 set in the environment variable RAY_ADDRESS
2024-05-24 08:33:35,067 INFO worker.py:1564 -- Connecting to existing Ray cluster at address: 10.224.0.204:6379...
2024-05-24 08:33:35,076 INFO worker.py:1740 -- Connected to Ray cluster. View the dashboard at 10.224.0.204:8265 
(autoscaler +3s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(autoscaler +3s) Adding 13 node(s) of type spotworker.
(autoscaler +3s) Resized to 112 CPUs.
initiated: 1 | Not initiated: 13
initiated: 2 | Not initiated: 12
initiated: 3 | Not initiated: 11
initiated: 4 | Not initiated: 10
initiated: 5 | Not initiated: 9
initiated: 6 | Not initiated: 8
initiated: 7 | Not initiated: 7
initiated: 8 | Not initiated: 6
initiated: 9 | Not initiated: 5
initiated: 10 | Not initiated: 4
initiated: 11 | Not initiated: 3
initiated: 11 | Not initiated: 3
initiated: 11 | Not initiated: 3
initiated: 11 | Not initiated: 3
initiated: 11 | Not initiated: 3
initiated: 11 | Not initiated: 3
initiated: 12 | Not initiated: 2
initiated: 12 | Not initiated: 2
initiated: 12 | Not initiated: 2
initiated: 12 | Not initiated: 2

------------------------------------------
Job 'raysubmit_ufPygdFwzrYy5LQ6' succeeded
------------------------------------------

My python script that should spawn 14 pods/workers:

import ray
from time import sleep

@ray.remote(num_cpus=8)
class MyActor:
    def __init__(self, value):
        self.value = value
    def is_ready(self):
        if (self.value % 2 > 0): sleep(5)
        return "Actor {} is ready".format(self.value)

ray.init()

## number of machines to spawn
spawn = 14
actor_handles = [MyActor.remote(k) for k in range(spawn)]
not_initiated_id = [handle.is_ready.remote() for handle in actor_handles]

ready_ids = 0
results = []
timeout = 0
while not_initiated_id:
    ## Timeout doesn't differentiate from running/scheduling
    initiated_id, not_initiated_id = ray.wait(not_initiated_id, timeout = 30)
    timeout += 30
    if len(initiated_id) > 0:
        ready_ids += len(initiated_id)
        results.append(ray.get(initiated_id[0]))
    print("initiated: {} | Not initiated: {}".format(ready_ids, len(not_initiated_id)))

    if (timeout >= 600):
        for task in not_initiated_id:
            ray.cancel(task)
        break

for result in results:
    print(result)

for handle in actor_handles:
    ray.kill(handle)

Azure cluster configuration yaml:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-autoscaler
spec:
  # The version of Ray you are using. Make sure all Ray containers are running this version of Ray.
  # Use the Ray nightly or Ray version >= 2.21.0 and KubeRay 1.1.0 or later for autoscaler v2.
  rayVersion: '2.21.0'
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 60
    imagePullPolicy: IfNotPresent
    # Optionally specify the Autoscaler container's securityContext.
    securityContext: {}
    env: []
    envFrom: []
    # resources:
    #   limits:
    #     cpu: "500m"
    #     memory: "512Mi"
    #   requests:
    #     cpu: "500m"
    #     memory: "512Mi"
  # Ray head pod template
  headGroupSpec:
    rayStartParams:
      # Setting "num-cpus: 0" to avoid any Ray actors or tasks being scheduled on the Ray head Pod.
      num-cpus: "0"
    # Pod template
    template:
      spec:
        nodeSelector:
          raymachine: rayhead
        containers:
        # The Ray head container
        - name: ray-head
          image: rayacr.azurecr.io/ray:2.21.0-py310
          ports:
          - containerPort: 6379
            name: gcs
          - containerPort: 8265
            name: dashboard
          - containerPort: 10001
            name: client
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh","-c","ray stop"]
          # resources:
          #   limits:
          #     cpu: "4"
          #     memory: "16G"
          #   requests:
          #     cpu: "2"
          #     memory: "8G"
          env:
            - name: RAY_enable_autoscaler_v2 # Pass env var for the autoscaler v2.
              value: "1"
          volumeMounts:
            - mountPath: /home/ray/samples
              name: ray-example-configmap
            - mountPath: "/mnt/azure"
              name: azurepvc
              readOnly: false
        volumes:
          - name: ray-example-configmap
            configMap:
              name: ray-example
              defaultMode: 0777
              items:
                - key: detached_actor.py
                  path: detached_actor.py
                - key: terminate_detached_actor.py
                  path: terminate_detached_actor.py
          - name: azurepvc
            persistentVolumeClaim:
              claimName: azure-file-disk
        restartPolicy: Never # No restart to avoid reuse of pod for different ray nodes.
  workerGroupSpecs:
  # the Pod replicas in this group typed worker
  - replicas: 0
    minReplicas: 0
    maxReplicas: 40
    groupName: spotworker
    rayStartParams:
      num-cpus: "8"
    # Pod template
    template:
      metadata:
        labels:
          podtype: worker
      spec:
        nodeSelector:
          raymachine: spotworker
        containers:
        - name: ray-worker
          image: rayacr.azurecr.io/ray:2.21.0-py310
          # resources:
          #   limits:
          #     cpu: "8"
          #     memory: "32G"
          #   requests:
          #     cpu: "4"
          #     memory: "16G"
          volumeMounts:
            - mountPath: /home/ray/samples
              name: ray-example-configmap
            - mountPath: "/mnt/azure"
              name: azurepvc
              readOnly: false
        restartPolicy: Never # Never restart a pod to avoid pod reuse
        tolerations:
        - key: "kubernetes.azure.com/scalesetpriority"
          operator: "Equal"
          value: "spot"
          effect: "NoSchedule"
        volumes:
          - name: azurepvc
            persistentVolumeClaim:
              claimName: azure-file-disk
        #Anti affinity, to guarantee only 1 worker per machine
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                - key: podtype
                  operator: In
                  values:
                  - worker
              topologyKey: "kubernetes.io/hostname"      
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: azure-file-disk
provisioner: file.csi.azure.com # replace with "kubernetes.io/azure-file" if aks version is less than 1.21
allowVolumeExpansion: true
mountOptions:
 - dir_mode=0777
 - file_mode=0777
 - uid=0
 - gid=0
 - mfsymlinks
 - cache=strict
 - actimeo=30
parameters:
  skuName: Standard_LRS
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: azure-file-disk
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: azure-file-disk
  resources:
    requests:
      storage: 5Gi
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-example
data:
  detached_actor.py: |
    import ray
    import sys

    @ray.remote(num_cpus=1)
    class Actor:
      pass

    ray.init(namespace="default_namespace")
    Actor.options(name=sys.argv[1], lifetime="detached").remote()

  terminate_detached_actor.py: |
    import ray
    import sys

    ray.init(namespace="default_namespace")
    detached_actor = ray.get_actor(sys.argv[1])
    ray.kill(detached_actor)
---
kind: Service
apiVersion: v1
metadata:
  name: raycluster-autoscaler-head-svc
spec:
  ports:
    - name: client
      protocol: TCP
      appProtocol: tcp
      port: 10001
      targetPort: 10001
    - name: dashboard
      protocol: TCP
      appProtocol: tcp
      port: 8265
      targetPort: 8265
    - name: gcs
      protocol: TCP
      appProtocol: tcp
      port: 6379
      targetPort: 6379
    - name: metrics
      protocol: TCP
      appProtocol: tcp
      port: 8080
      targetPort: 8080
  selector:
    app.kubernetes.io/created-by: kuberay-operator
    app.kubernetes.io/name: kuberay
    ray.io/cluster: raycluster-autoscaler
    ray.io/identifier: raycluster-autoscaler-head
    ray.io/node-type: head
  type: LoadBalancer
  loadBalancerSourceRanges:
  - 192.168.0.0/32          # restrict to company IP

EDIT: Redeployed all the pods. And tried again with spawn=8. Funily it spawned 7 pods. But even after the job finished due to timeout/cancel I got this from ray status:

======== Autoscaler status: 2024-05-24 13:41:20.153528 ========
Node status
---------------------------------------------------------------
Active:
 1 headgroup
Idle:
 (no idle nodes)
Pending:
 spotworker, 1 launching
Recent failures:
 spotworker: NodeTerminated (ip: 10.224.1.35)
 spotworker: NodeTerminated (ip: 10.224.10.86)
 spotworker: NodeTerminated (ip: 10.224.1.163)
 spotworker: NodeTerminated (ip: 10.224.0.244)
 spotworker: NodeTerminated (ip: 10.224.1.9)
 spotworker: NodeTerminated (ip: 10.224.10.160)
 spotworker: NodeTerminated (ip: 10.224.11.125)

Resources
---------------------------------------------------------------
Usage:
 0B/18.44GiB memory
 0B/9.22GiB object_store_memory

Demands:
 (no resource demands)

Which is very weird, because on kubectl get po doesn’t say there is anything pending, just when using ray status.

Interesting > is this happening consistently where there’s a 1-2 resource gap?

Yes, I’ve tried a couple times.

The only moment it seem to execute correctly is when I spawn only 1 Actor. Which it auto scales correctly to 1 worker

I’ve made a change and made the Head node possible to schedule stuff. So, addednum-cpus=8 to the head node spec.

Seems to have fixed. My Impression is that the printing and task scheduling is correct, but the auto-scaling is considering the head node resources as available.

Edit: it has not

@Sam_Chan

Did some extra investigation. The issue kept happening.
Instead of using the actors to “hot start” the nodes, I tried using the placement groups and it had the same behavior. But now I had a small insight.

If I add a sleep timer when I want to start the actors on the python script above:

actor_handles = [MyActor.remote(k) for k in range(spawn) if sleep(2) is None]

It actually launches correctly instead of one of the pod getting stuck on “pending launching state” shown above on status.

I think that when making a lot of quick requests it something about the communication between Ray and the K8s cluster is getting lost in the middle of the way. Any insight?

But seems it can still randomly happens.

Edit: increased the sleep time to 3 seconds and looks fairly stable. But it is a workaround pretty much.

Oh interesting - so you’re running a Ray Cluster on top of a K8s clusters sitting on Azure? Are you using AKS?

Yes, Ray cluster on top of AKS.

Issue still happening, sometimes worse depending on the amount of nodes I request.

The issue also happens if I try to spawn new pods while some previous provisioned ones are in “Terminating state” (but not yet fully terminated)

I had opened an issue on git, [Core] Ray Worker stuck in launching state - Azure AKS · Issue #45775 · ray-project/ray · GitHub. Looks like the issue is a problem with the autoscale V2, I didn’t notice I was using an alpha version.