Join tasks getting stuck in PENDING_NODE_ASSIGNMENT

1. Severity of the issue:
Medium: Significantly affects my productivity but can find a workaround.

2. Environment:

  • Ray version: 2.46.0
  • Python version: 3.10.17
  • OS: Windows 11 but via Ubuntu 24.04 in WSL
  • Cloud/Infrastructure: Local k8s using Rancher Desktop
  • Other libs/tools (if relevant): I’m running k8s on Rancher Desktop, then deploying a Ray Cluster on k8s.

3. What happened vs. what you expected:

  • Expected: I expect the tasks to run and the join to finish.
  • Actual: The tasks are hanging in PENDING_NODE_ASSIGNMENT, with the result that the job never seems to finish.

Hi everyone,

I’m new to Ray, been testing it to determine if it’ll be suitable for our purposes. Most recently I’m trying the new support for joining datasets, but I keep hitting an issue where some tasks complete, but most get stuck in the PENDING_NODE_ASSIGNMENT state and the join never completes. (There are similar threads concerning PENDING_NODE_ASSIGNMENT , but they aren’t relevant for what I’m experiencing.)

Here is the code I’m running. It loads the Iris dataset and does a “cross” join to create multiple replicas of the same rows, where the number of replicas is a CLI arg for the script. (The idea being then I have a bigger dataset I can use to test data transformations, etc.)

import sys
import ray

ctx = ray.init(address="auto")

ds_og = ray.data.read_csv(
 
 "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv"
)

num_replicas = int(sys.argv[1]) if len(sys.argv) > 1 else 1
num_partitions = max(1, num_replicas // 100)
print(f"Number of data replicas : {num_replicas}")
print(f"Number of partitions    : {num_partitions}")

# Create `num_replicas` replicas of the Iris data by performing a cross join between the
# dataset [0, ..., num_replicas - 1] and the Iris data
rep = ray.data.range(num_replicas).add_column("cross", lambda _: 0)
ds = (
    ds_og
    .add_column("cross", lambda _: 0)
    .join(rep, join_type="inner", on=("cross",), num_partitions=num_partitions)
)

print("\nSample rows:")
print(ds.take_batch(10, batch_format="pandas"))

I run it using

RAY_ADDRESS=http://localhost:8265 uv run ray job submit --working-dir . -- python myfile.py 1000;

It works for a CLI arg value of 10 or 100, but for a value of 1000:

  • It completes a few tasks, then there’s a whole bunch of HashShuffleAggregator tasks that get stuck in the pending state (and no tasks are “running”).
  • The workers don’t seem to be busy at all (minimal CPU usage, plenty of memory available).

My cluster (a YAML deployment on k8s) has:

  • 1 head node with 1 CPU, 8 Gi memory.
  • 2-4 worker nodes (using autoscaler) with 1 CPU, 2 Gi memory each.

Hoping someone can help point me in the right direction to get this working. It might be related to the number of partitions, but even if there are many small partitions things should still run I think?

This is the YAML deployment I’m using:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ray-cluster
  namespace: default
spec:
  rayVersion: 2.46.0
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 60
    imagePullPolicy: IfNotPresent
    resources:
      limits:
        cpu: "500m"
        memory: "512Mi"
      requests:
        cpu: "500m"
        memory: "512Mi"
  headGroupSpec:
    rayStartParams:
      dashboard-host: 0.0.0.0
      num-cpus: "0"
      num-gpus: "0"
      resources: '"{\"worker\": 0}"'
      disable-usage-stats: "true"
    template:
      spec:
        containers:
          - image: rayproject/ray:2.46.0-py310
            name: ray-head
            env:
              - name: RAY_enable_autoscaler_v2
                value: "1"
              - name: RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING
                value: "1"
              # - name: RAY_enable_infeasible_task_early_exit
              #   value: "true"
            ports:
              - containerPort: 6379
                name: gcs-server
              - containerPort: 8265
                name: dashboard
              - containerPort: 10001
                name: client
            resources:
              limits:
                cpu: "1"
                memory: 8Gi
              requests:
                cpu: "1"
                memory: 8Gi
        restartPolicy: Never # Prevent container restart to maintain Ray health
  workerGroupSpecs:
    - groupName: default-group
      rayStartParams:
        metrics-export-port: "8080"
        resources: '"{\"worker\": 1}"'
        disable-usage-stats: "true"
      replicas: 2
      minReplicas: 2
      maxReplicas: 4
      template:
        spec:
          containers:
            - image: rayproject/ray:2.46.0-py310
              name: ray-worker
              resources:
                limits:
                  cpu: "1"
                  memory: 2Gi
                requests:
                  cpu: "1"
                  memory: 2Gi
          restartPolicy: Never # Prevent k8s from restarting worker pods, enabling correct instance management by Ray

Hi! I’ve noticed in your rayStartParams that your resources specify worker as 1, but it doesn’t explicitly state the number of CPU or GPU available. Can you try adding num-cpus: "1" in your rayStartParams and see if that helps fixes it if it might be an issue with CPU advertisement?

You can read more about it here: Cluster Management CLI — Ray 2.46.0

Can you see if increasing the number of CPU or GPU worker might help with this issue (if possible) and explicitly stating the number of each in the yaml?

If it’s not an issue with the CPU resource then you will have to do some more debugging - when things get stuck again can you try running ray summary tasks and lmk what it says?