Ray Head restarting and leaving behind zombie processes

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Sharing our setup in case you can help us out with it in any manner.We have an Airflow DAG setup which contains multiple tasks that submit individual Ray Jobs
in this manner, and continue polling till the Job reaches one of the finished states (almost the same as in the documentation, except an in-house package is being provided in the runtime_env for every job):

client = JobSubmissionClient(address="ray://<>")
        job_id = client.submit_job(
            entrypoint=f"python3 entry.py",
            runtime_env={"working_dir": <>,     "py_modules": [custom_package]},
            entrypoint_num_cpus=1,
            entrypoint_resources={"worker_cpu": 1},
        )
        logger.info(job_id)
        wait_until_status(
            client=client,
            job_id=job_id,
            status_to_wait_for={JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
            timeout_seconds=1800,
            polling_rate=30,
        )

We are aiming to run around 3000 such Ray Jobs with a dynamic number of Actors per Job.
There is a ray.get() happening in only 600 of those Ray Jobs
We have provided sufficient resources for each Actor, and have a very large Cluster
(
Head Node with 32 CPUs 128 GB RAM
10 workers with 32 CPUs 64 GB RAM
40 workers with 8 CPUs 16 GB RAM 1 Tesla T4 with 16 GB VRAM

This is our setupPython Version - 3.8.18 We have tried running this with the following images

  1. rayproject/ray:2.7.1-py38-cpu & rayproject/ray:2.7.1-py38-cu117
  2. rayproject/ray:2.9.3-py38-cpu & rayproject/ray:2.9.3-py38-cu117
environments:
  prod:
    values:
      - values/prod.yaml
  stg:
    values:
      - values/stg.yaml
---
repositories:
  - name: kuberay
    url: https://ray-project.github.io/kuberay-helm/
  - name: monochart
    url: <account_number>.dkr.ecr.eu-central-1.amazonaws.com
    oci: true

helmDefaults:
  timeout: 900
  wait: true

releases:
  - name: kuberay-{{ .Environment.Name }}
    namespace: ray
    createNamespace: true
    chart: kuberay/kuberay-operator
    version: 1.0.0
    atomic: true
    wait: true
    labels:
      app: kuberay-{{ .Environment.Name }}
    values:
      - affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: "type"
                      operator: "In"
                      values:
                        - "apps"
                    - key: "kubernetes.io/arch"
                      operator: "In"
                      values:
                        - "amd64"
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - topologyKey: "kubernetes.io/hostname"
        tolerations:
          - key: "apps"
            operator: "Equal"
            value: "true"
            effect: "NoExecute"
          - key: "apps"
            operator: "Equal"
            value: "true"
            effect: "NoSchedule"
        resources:
          limits:
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 512Mi

  - name: ray-cluster-{{ .Environment.Name }}
    namespace: ray
    createNamespace: true
    chart: kuberay/ray-cluster
    version: 1.0.0
    atomic: true
    wait: true
    labels:
      app: ray-cluster-{{ .Environment.Name }}
    values:
      - fullnameOverride: ray-cluster
        image:
          repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
          tag: {{ .Values.ray.cpu.tag }}
          pullPolicy: IfNotPresent
        head:
          rayVersion: '2.7.1'
          enableInTreeAutoscaling: true
          serviceAccountName: ray
          autoscalerOptions:
            idleTimeoutSeconds: 300
            resources:
              limits:
                cpu: 4
                memory: 4Gi
              requests:
                cpu: 4
                memory: 4Gi
            env:
              - name: AUTOSCALER_MAX_NUM_FAILURES
                value: inf
              - name: RAY_LOG_TO_STDERR
                value: "1"
          rayStartParams:
            num-cpus: 0
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                      - key: "type"
                        operator: "In"
                        values:
                          - "ray"
                      - key: "karpenter.k8s.aws/instance-family"
                        operator: "In"
                        values:
                          - "r7i"
                          - "r7iz"
                          - "r7a"
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - topologyKey: "kubernetes.io/hostname"
          tolerations:
            - key: "ray"
              operator: "Equal"
              value: "true"
              effect: "NoExecute"
            - key: "ray"
              operator: "Equal"
              value: "true"
              effect: "NoSchedule"
          resources:
            limits:
              cpu: {{ .Values.ray.head.resources.limits.cpu }}
              memory: {{ .Values.ray.head.resources.limits.memory }}
            requests:
              cpu: {{ .Values.ray.head.resources.requests.cpu }}
              memory: {{ .Values.ray.head.resources.requests.memory }}
          ports:
            - containerPort: 6379
              name: gcs
              protocol: TCP
            - containerPort: 8265
              name: dashboard
              protocol: TCP
            - containerPort: 10001
              name: client
              protocol: TCP
          containerEnv:
            - name: RAY_GRAFANA_IFRAME_HOST
              value: https://grafana.internal.{{ .Environment.Name }}.scoutbee.app
            - name: RAY_GRAFANA_HOST
              value: http://prometheus-{{ .Environment.Name }}-grafana.prometheus.svc.cluster.local:80
            - name: RAY_PROMETHEUS_HOST
              value: {{ .Values.env.promethues }}
            - name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
              value: "1800"
            - name: RAY_LOG_TO_STDERR
              value: "1"
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh","-c","ray stop"]
          volumeMounts:
            - mountPath: /opt
              name: config
            - mountPath: /tmp/ray
              name: ray-logs
          securityContext:
            capabilities:
              add:
              - SYS_PTRACE
          volumes:
            - name: ray-logs
              emptyDir: {}
          headService:
            metadata:
              annotations:
                service.beta.kubernetes.io/aws-load-balancer-scheme: internal
                service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
                service.beta.kubernetes.io/aws-load-balancer-name: ray-cluster-nlb
                service.beta.kubernetes.io/aws-load-balancer-type: nlb
                service.beta.kubernetes.io/aws-load-balancer-target-group-attributes: preserve_client_ip.enabled=true
                external-dns.alpha.kubernetes.io/hostname: ray-client.internal.{{ .Environment.Name }}.<company>.app
        service:
          type: LoadBalancer
        worker:
          disabled: true
        additionalWorkerGroups:
          ray-cpu:
            disabled: false
            replicas: {{ .Values.ray.cpuGroup.min }}
            minReplicas: {{ .Values.ray.cpuGroup.min }}
            maxReplicas: {{ .Values.ray.cpuGroup.max }}
            serviceAccountName: ray
            rayStartParams:
              resources: '"{\"worker_cpu\": {{ .Values.ray.cpuGroup.resources.limits.cpu }}}"'
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                    - matchExpressions:
                        - key: "type"
                          operator: "In"
                          values:
                            - "ray"
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - topologyKey: "kubernetes.io/hostname"
            tolerations:
              - key: "ray"
                operator: "Equal"
                value: "true"
                effect: "NoExecute"
              - key: "ray"
                operator: "Equal"
                value: "true"
                effect: "NoSchedule"
            image:
              repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
              tag: {{ .Values.ray.cpu.tag }}
              pullPolicy: IfNotPresent
            volumeMounts:
              - mountPath: /tmp/ray
                name: ray-logs
            resources:
              limits:
                cpu: {{ .Values.ray.cpuGroup.resources.limits.cpu }}
                memory: {{ .Values.ray.cpuGroup.resources.limits.memory }}
              requests:
                cpu: {{ .Values.ray.cpuGroup.resources.requests.cpu }}
                memory: {{ .Values.ray.cpuGroup.resources.requests.memory }}
            volumes:
              - name: ray-logs
                emptyDir: {}
              - name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
                value: "1800"
              - name: RAY_CLIENT_SERVER_MAX_THREADS
                value: '10000'
              - name: RAY_LOG_TO_STDERR
                value: "1"
              - name: RAY_gcs_resource_report_poll_period_ms
                value: '20000'
            ports: []
            nodeSelector: {}
            securityContext:
              capabilities:
                add:
                - SYS_PTRACE
            annotations: {}
          ray-gpu:
            disabled: false
            replicas: {{ .Values.ray.gpuGroup.min }}
            minReplicas: {{ .Values.ray.gpuGroup.min }}
            maxReplicas: {{ .Values.ray.gpuGroup.max }}
            serviceAccountName: ray
            rayStartParams:
              resources: '"{\"worker_gpu\": 1}"'
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                    - matchExpressions:
                        - key: "type"
                          operator: "In"
                          values:
                            - "ray-gpu"
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - topologyKey: "kubernetes.io/hostname"
            tolerations:
              - key: "ray-gpu"
                operator: "Equal"
                value: "true"
                effect: "NoExecute"
              - key: "ray-gpu"
                operator: "Equal"
                value: "true"
                effect: "NoSchedule"
            image:
              repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
              tag: {{ .Values.ray.gpu.tag }}
              pullPolicy: IfNotPresent
            volumeMounts:
              - mountPath: /tmp/ray
                name: ray-logs
            resources:
              limits:
                nvidia.com/gpu: 1
                cpu: {{ .Values.ray.gpuGroup.resources.limits.cpu }}
                memory: {{ .Values.ray.gpuGroup.resources.limits.memory }}
              requests:
                cpu: {{ .Values.ray.gpuGroup.resources.requests.cpu }}
                memory: {{ .Values.ray.gpuGroup.resources.requests.memory }}
                nvidia.com/gpu: 1
            volumes:
              - name: ray-logs
                emptyDir: {}
              - name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
                value: "1800"
              - name: RAY_CLIENT_SERVER_MAX_THREADS
                value: '10000'
              - name: RAY_LOG_TO_STDERR
                value: "1"
              - name: RAY_gcs_resource_report_poll_period_ms
                value: '20000'
            ports: []
            nodeSelector: {}
            securityContext:
              capabilities:
                add:
                - SYS_PTRACE
            annotations: {}

We have experimented with both an internal GCS and an External GCS (Redis)

The main issue we are running into right now is that every time jobs are submitted, the Head Node goes into BackOff and restarts.There is no obvious sign of lack of resources because the CPU and Memory usage are very low compared to the limits.Once the Head Node comes back up online, there are a large number of Jobs in the RUNNING state indefinitely with the default anaconda entrypoint and no Tasks/Actors attached to them, stalling processing completely./home/ray/anaconda3/bin/python -m ray.util.client.server --address=10.120.9.248:6379 --host=0.0.0.0 --port=23005 --mode=specific-server

What we have tried so far,

  1. Initially we were using the Ray Client to submit jobs, after which we switched to the Ray Jobs API.
  2. Remove all the ray.get() calls from the jobs, that were feasible
  3. Reducing the number of tasks being submitted at once all the way down to 100
  4. Increasing the processing power and network bandwidth of the Head Node
  5. Different versions of Ray (2.7.1, 2.7.2 & 2.9.3)
  6. Different versions of Kuberay (1.0.0 and 1.1.0)

Example Actor:

import polars as pl
import ray
import <custom_package>


@ray.remote(
    num_cpus=0.1, resources={"custom_resource": 0.1}, max_restarts=4, max_task_retries=-1, scheduling_strategy="SPREAD"
)
class Actor:
    def __init__(self, data: List[str]):
        self.model = Model()
        self.data = data

    def predict_page(self) -> List[Dict[str, Any]]:
        results = []
        for item in self.data:
            results.append(self.model.predict(item))
        >> DataFrame Transformations
        return results

Example Entrypoint:

import logging

import click
import ray
import <custom_package>

logger = logging.getLogger(__name__)


@click.command()
@click.option("--bucket")
@click.option("--key")
def predict(bucket: str, key: str) -> None:
    ray.init()
    logger.warning(f"Accessing bucket - {bucket}")
    logger.warning(f"Accessing key - {key}")
    >> Read from external object store
    for batch in batches:
        actors.append(
            Actor.remote(data)
        )
    [actor.predict.remote() for actor in actors]
    ## Optional ray.get() call
    # predictions = ray.get([actor.predict.remote() for actor in actors])
    # logger.warning(predictions)


if __name__ == "__main__":
    predict()