How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Sharing our setup in case you can help us out with it in any manner.We have an Airflow DAG setup which contains multiple tasks that submit individual Ray Jobs
in this manner, and continue polling till the Job reaches one of the finished states (almost the same as in the documentation, except an in-house package is being provided in the runtime_env
for every job):
client = JobSubmissionClient(address="ray://<>")
job_id = client.submit_job(
entrypoint=f"python3 entry.py",
runtime_env={"working_dir": <>, "py_modules": [custom_package]},
entrypoint_num_cpus=1,
entrypoint_resources={"worker_cpu": 1},
)
logger.info(job_id)
wait_until_status(
client=client,
job_id=job_id,
status_to_wait_for={JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=1800,
polling_rate=30,
)
We are aiming to run around 3000 such Ray Jobs with a dynamic number of Actors per Job.
There is a ray.get() happening in only 600 of those Ray Jobs
We have provided sufficient resources for each Actor, and have a very large Cluster
(
Head Node with 32 CPUs 128 GB RAM
10 workers with 32 CPUs 64 GB RAM
40 workers with 8 CPUs 16 GB RAM 1 Tesla T4 with 16 GB VRAM
This is our setupPython Version - 3.8.18
We have tried running this with the following images
rayproject/ray:2.7.1-py38-cpu
&rayproject/ray:2.7.1-py38-cu117
rayproject/ray:2.9.3-py38-cpu
&rayproject/ray:2.9.3-py38-cu117
environments:
prod:
values:
- values/prod.yaml
stg:
values:
- values/stg.yaml
---
repositories:
- name: kuberay
url: https://ray-project.github.io/kuberay-helm/
- name: monochart
url: <account_number>.dkr.ecr.eu-central-1.amazonaws.com
oci: true
helmDefaults:
timeout: 900
wait: true
releases:
- name: kuberay-{{ .Environment.Name }}
namespace: ray
createNamespace: true
chart: kuberay/kuberay-operator
version: 1.0.0
atomic: true
wait: true
labels:
app: kuberay-{{ .Environment.Name }}
values:
- affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "type"
operator: "In"
values:
- "apps"
- key: "kubernetes.io/arch"
operator: "In"
values:
- "amd64"
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "apps"
operator: "Equal"
value: "true"
effect: "NoExecute"
- key: "apps"
operator: "Equal"
value: "true"
effect: "NoSchedule"
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 512Mi
- name: ray-cluster-{{ .Environment.Name }}
namespace: ray
createNamespace: true
chart: kuberay/ray-cluster
version: 1.0.0
atomic: true
wait: true
labels:
app: ray-cluster-{{ .Environment.Name }}
values:
- fullnameOverride: ray-cluster
image:
repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
tag: {{ .Values.ray.cpu.tag }}
pullPolicy: IfNotPresent
head:
rayVersion: '2.7.1'
enableInTreeAutoscaling: true
serviceAccountName: ray
autoscalerOptions:
idleTimeoutSeconds: 300
resources:
limits:
cpu: 4
memory: 4Gi
requests:
cpu: 4
memory: 4Gi
env:
- name: AUTOSCALER_MAX_NUM_FAILURES
value: inf
- name: RAY_LOG_TO_STDERR
value: "1"
rayStartParams:
num-cpus: 0
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "type"
operator: "In"
values:
- "ray"
- key: "karpenter.k8s.aws/instance-family"
operator: "In"
values:
- "r7i"
- "r7iz"
- "r7a"
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "ray"
operator: "Equal"
value: "true"
effect: "NoExecute"
- key: "ray"
operator: "Equal"
value: "true"
effect: "NoSchedule"
resources:
limits:
cpu: {{ .Values.ray.head.resources.limits.cpu }}
memory: {{ .Values.ray.head.resources.limits.memory }}
requests:
cpu: {{ .Values.ray.head.resources.requests.cpu }}
memory: {{ .Values.ray.head.resources.requests.memory }}
ports:
- containerPort: 6379
name: gcs
protocol: TCP
- containerPort: 8265
name: dashboard
protocol: TCP
- containerPort: 10001
name: client
protocol: TCP
containerEnv:
- name: RAY_GRAFANA_IFRAME_HOST
value: https://grafana.internal.{{ .Environment.Name }}.scoutbee.app
- name: RAY_GRAFANA_HOST
value: http://prometheus-{{ .Environment.Name }}-grafana.prometheus.svc.cluster.local:80
- name: RAY_PROMETHEUS_HOST
value: {{ .Values.env.promethues }}
- name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
value: "1800"
- name: RAY_LOG_TO_STDERR
value: "1"
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
volumeMounts:
- mountPath: /opt
name: config
- mountPath: /tmp/ray
name: ray-logs
securityContext:
capabilities:
add:
- SYS_PTRACE
volumes:
- name: ray-logs
emptyDir: {}
headService:
metadata:
annotations:
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-name: ray-cluster-nlb
service.beta.kubernetes.io/aws-load-balancer-type: nlb
service.beta.kubernetes.io/aws-load-balancer-target-group-attributes: preserve_client_ip.enabled=true
external-dns.alpha.kubernetes.io/hostname: ray-client.internal.{{ .Environment.Name }}.<company>.app
service:
type: LoadBalancer
worker:
disabled: true
additionalWorkerGroups:
ray-cpu:
disabled: false
replicas: {{ .Values.ray.cpuGroup.min }}
minReplicas: {{ .Values.ray.cpuGroup.min }}
maxReplicas: {{ .Values.ray.cpuGroup.max }}
serviceAccountName: ray
rayStartParams:
resources: '"{\"worker_cpu\": {{ .Values.ray.cpuGroup.resources.limits.cpu }}}"'
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "type"
operator: "In"
values:
- "ray"
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "ray"
operator: "Equal"
value: "true"
effect: "NoExecute"
- key: "ray"
operator: "Equal"
value: "true"
effect: "NoSchedule"
image:
repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
tag: {{ .Values.ray.cpu.tag }}
pullPolicy: IfNotPresent
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
resources:
limits:
cpu: {{ .Values.ray.cpuGroup.resources.limits.cpu }}
memory: {{ .Values.ray.cpuGroup.resources.limits.memory }}
requests:
cpu: {{ .Values.ray.cpuGroup.resources.requests.cpu }}
memory: {{ .Values.ray.cpuGroup.resources.requests.memory }}
volumes:
- name: ray-logs
emptyDir: {}
- name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
value: "1800"
- name: RAY_CLIENT_SERVER_MAX_THREADS
value: '10000'
- name: RAY_LOG_TO_STDERR
value: "1"
- name: RAY_gcs_resource_report_poll_period_ms
value: '20000'
ports: []
nodeSelector: {}
securityContext:
capabilities:
add:
- SYS_PTRACE
annotations: {}
ray-gpu:
disabled: false
replicas: {{ .Values.ray.gpuGroup.min }}
minReplicas: {{ .Values.ray.gpuGroup.min }}
maxReplicas: {{ .Values.ray.gpuGroup.max }}
serviceAccountName: ray
rayStartParams:
resources: '"{\"worker_gpu\": 1}"'
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: "type"
operator: "In"
values:
- "ray-gpu"
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: "kubernetes.io/hostname"
tolerations:
- key: "ray-gpu"
operator: "Equal"
value: "true"
effect: "NoExecute"
- key: "ray-gpu"
operator: "Equal"
value: "true"
effect: "NoSchedule"
image:
repository: <account_number>.dkr.ecr.eu-central-1.amazonaws.com/ray@sha256
tag: {{ .Values.ray.gpu.tag }}
pullPolicy: IfNotPresent
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
resources:
limits:
nvidia.com/gpu: 1
cpu: {{ .Values.ray.gpuGroup.resources.limits.cpu }}
memory: {{ .Values.ray.gpuGroup.resources.limits.memory }}
requests:
cpu: {{ .Values.ray.gpuGroup.resources.requests.cpu }}
memory: {{ .Values.ray.gpuGroup.resources.requests.memory }}
nvidia.com/gpu: 1
volumes:
- name: ray-logs
emptyDir: {}
- name: RAY_RUNTIME_ENV_TEMPORARY_REFERENCE_EXPIRATION_S
value: "1800"
- name: RAY_CLIENT_SERVER_MAX_THREADS
value: '10000'
- name: RAY_LOG_TO_STDERR
value: "1"
- name: RAY_gcs_resource_report_poll_period_ms
value: '20000'
ports: []
nodeSelector: {}
securityContext:
capabilities:
add:
- SYS_PTRACE
annotations: {}
We have experimented with both an internal GCS and an External GCS (Redis)
The main issue we are running into right now is that every time jobs are submitted, the Head Node goes into BackOff
and restarts.There is no obvious sign of lack of resources because the CPU and Memory usage are very low compared to the limits.Once the Head Node comes back up online, there are a large number of Jobs in the RUNNING
state indefinitely with the default anaconda entrypoint and no Tasks/Actors attached to them, stalling processing completely./home/ray/anaconda3/bin/python -m ray.util.client.server --address=10.120.9.248:6379 --host=0.0.0.0 --port=23005 --mode=specific-server
What we have tried so far,
- Initially we were using the Ray Client to submit jobs, after which we switched to the Ray Jobs API.
- Remove all the
ray.get()
calls from the jobs, that were feasible - Reducing the number of tasks being submitted at once all the way down to 100
- Increasing the processing power and network bandwidth of the Head Node
- Different versions of Ray (2.7.1, 2.7.2 & 2.9.3)
- Different versions of Kuberay (1.0.0 and 1.1.0)
Example Actor:
import polars as pl
import ray
import <custom_package>
@ray.remote(
num_cpus=0.1, resources={"custom_resource": 0.1}, max_restarts=4, max_task_retries=-1, scheduling_strategy="SPREAD"
)
class Actor:
def __init__(self, data: List[str]):
self.model = Model()
self.data = data
def predict_page(self) -> List[Dict[str, Any]]:
results = []
for item in self.data:
results.append(self.model.predict(item))
>> DataFrame Transformations
return results
Example Entrypoint:
import logging
import click
import ray
import <custom_package>
logger = logging.getLogger(__name__)
@click.command()
@click.option("--bucket")
@click.option("--key")
def predict(bucket: str, key: str) -> None:
ray.init()
logger.warning(f"Accessing bucket - {bucket}")
logger.warning(f"Accessing key - {key}")
>> Read from external object store
for batch in batches:
actors.append(
Actor.remote(data)
)
[actor.predict.remote() for actor in actors]
## Optional ray.get() call
# predictions = ray.get([actor.predict.remote() for actor in actors])
# logger.warning(predictions)
if __name__ == "__main__":
predict()