Join tasks getting stuck in PENDING_NODE_ASSIGNMENT

1. Severity of the issue:
Medium: Significantly affects my productivity but can find a workaround.

2. Environment:

  • Ray version: 2.46.0
  • Python version: 3.10.17
  • OS: Windows 11 but via Ubuntu 24.04 in WSL
  • Cloud/Infrastructure: Local k8s using Rancher Desktop
  • Other libs/tools (if relevant): I’m running k8s on Rancher Desktop, then deploying a Ray Cluster on k8s.

3. What happened vs. what you expected:

  • Expected: I expect the tasks to run and the join to finish.
  • Actual: The tasks are hanging in PENDING_NODE_ASSIGNMENT, with the result that the job never seems to finish.

Hi everyone,

I’m new to Ray, been testing it to determine if it’ll be suitable for our purposes. Most recently I’m trying the new support for joining datasets, but I keep hitting an issue where some tasks complete, but most get stuck in the PENDING_NODE_ASSIGNMENT state and the join never completes. (There are similar threads concerning PENDING_NODE_ASSIGNMENT , but they aren’t relevant for what I’m experiencing.)

Here is the code I’m running. It loads the Iris dataset and does a “cross” join to create multiple replicas of the same rows, where the number of replicas is a CLI arg for the script. (The idea being then I have a bigger dataset I can use to test data transformations, etc.)

import sys
import ray

ctx = ray.init(address="auto")

ds_og = ray.data.read_csv(
 
 "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv"
)

num_replicas = int(sys.argv[1]) if len(sys.argv) > 1 else 1
num_partitions = max(1, num_replicas // 100)
print(f"Number of data replicas : {num_replicas}")
print(f"Number of partitions    : {num_partitions}")

# Create `num_replicas` replicas of the Iris data by performing a cross join between the
# dataset [0, ..., num_replicas - 1] and the Iris data
rep = ray.data.range(num_replicas).add_column("cross", lambda _: 0)
ds = (
    ds_og
    .add_column("cross", lambda _: 0)
    .join(rep, join_type="inner", on=("cross",), num_partitions=num_partitions)
)

print("\nSample rows:")
print(ds.take_batch(10, batch_format="pandas"))

I run it using

RAY_ADDRESS=http://localhost:8265 uv run ray job submit --working-dir . -- python myfile.py 1000;

It works for a CLI arg value of 10 or 100, but for a value of 1000:

  • It completes a few tasks, then there’s a whole bunch of HashShuffleAggregator tasks that get stuck in the pending state (and no tasks are “running”).
  • The workers don’t seem to be busy at all (minimal CPU usage, plenty of memory available).

My cluster (a YAML deployment on k8s) has:

  • 1 head node with 1 CPU, 8 Gi memory.
  • 2-4 worker nodes (using autoscaler) with 1 CPU, 2 Gi memory each.

Hoping someone can help point me in the right direction to get this working. It might be related to the number of partitions, but even if there are many small partitions things should still run I think?

This is the YAML deployment I’m using:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: ray-cluster
  namespace: default
spec:
  rayVersion: 2.46.0
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 60
    imagePullPolicy: IfNotPresent
    resources:
      limits:
        cpu: "500m"
        memory: "512Mi"
      requests:
        cpu: "500m"
        memory: "512Mi"
  headGroupSpec:
    rayStartParams:
      dashboard-host: 0.0.0.0
      num-cpus: "0"
      num-gpus: "0"
      resources: '"{\"worker\": 0}"'
      disable-usage-stats: "true"
    template:
      spec:
        containers:
          - image: rayproject/ray:2.46.0-py310
            name: ray-head
            env:
              - name: RAY_enable_autoscaler_v2
                value: "1"
              - name: RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING
                value: "1"
              # - name: RAY_enable_infeasible_task_early_exit
              #   value: "true"
            ports:
              - containerPort: 6379
                name: gcs-server
              - containerPort: 8265
                name: dashboard
              - containerPort: 10001
                name: client
            resources:
              limits:
                cpu: "1"
                memory: 8Gi
              requests:
                cpu: "1"
                memory: 8Gi
        restartPolicy: Never # Prevent container restart to maintain Ray health
  workerGroupSpecs:
    - groupName: default-group
      rayStartParams:
        metrics-export-port: "8080"
        resources: '"{\"worker\": 1}"'
        disable-usage-stats: "true"
      replicas: 2
      minReplicas: 2
      maxReplicas: 4
      template:
        spec:
          containers:
            - image: rayproject/ray:2.46.0-py310
              name: ray-worker
              resources:
                limits:
                  cpu: "1"
                  memory: 2Gi
                requests:
                  cpu: "1"
                  memory: 2Gi
          restartPolicy: Never # Prevent k8s from restarting worker pods, enabling correct instance management by Ray

Hi! I’ve noticed in your rayStartParams that your resources specify worker as 1, but it doesn’t explicitly state the number of CPU or GPU available. Can you try adding num-cpus: "1" in your rayStartParams and see if that helps fixes it if it might be an issue with CPU advertisement?

You can read more about it here: Cluster Management CLI — Ray 2.46.0

Can you see if increasing the number of CPU or GPU worker might help with this issue (if possible) and explicitly stating the number of each in the yaml?

If it’s not an issue with the CPU resource then you will have to do some more debugging - when things get stuck again can you try running ray summary tasks and lmk what it says?

1 Like

Hi Christina, thank you kindly for your reply.

I added num-cpus and num-gpus to the worker rayStartParams (set to the same as what’s set for the resource requests/limits). Unfortunately that doesn’t fix the issue.

So I decided to run a few experiments, increasing the number of workers or the specs of the workers (cpu + memory) as you suggested. This is what I got:

Head (cpu=1, mem=8Gi); 2-10 x Worker (cpu=1, mem=2Gi):
(This has more workers than before, but the CPU/mem specs are the same.)

  • The Python job works for n = 1000 (where n is the CLI arg for the script.)
  • However, same problem as before for n = 2000, i.e., tasks get stuck in the PENDING_NODE_ASSIGNMENT state. Running ray summary tasks gives the following:
    	======== Tasks Summary: 2025-05-18 22:48:47.852853 ========
    	Stats:
    	------------------------------------
    	total_actor_scheduled: 45
    	total_actor_tasks: 740
    	total_tasks: 79
    
    
    	Table (group by func_name):
    	------------------------------------
    			FUNC_OR_CLASS_NAME                           STATE_COUNTS                TYPE
    	0   _shuffle_block                               FINISHED: 33                NORMAL_TASK
    																									 PENDING_NODE_ASSIGNMENT: 1
    	1   JobSupervisor.ping                           FAILED: 2                   ACTOR_TASK
    																									 FINISHED: 429
    	2   AutoscalingRequester.request_resources       FINISHED: 25                ACTOR_TASK
    	3   _StatsActor.get_dataset_id                   FINISHED: 18                ACTOR_TASK
    	4   _StatsActor.update_metrics                   FINISHED: 86                ACTOR_TASK
    	5   _StatsActor.get_datasets                     FINISHED: 79                ACTOR_TASK
    	6   HashShuffleAggregator.finalize               FINISHED: 20                ACTOR_TASK
    	7   HashShuffleAggregator.submit                 FINISHED: 69                ACTOR_TASK
    	8   _map_task                                    FINISHED: 37                NORMAL_TASK
    																									 PENDING_NODE_ASSIGNMENT: 6
    	9   JobSupervisor.run                            FINISHED: 2                 ACTOR_TASK
    																									 RUNNING: 1
    	10  slice_fn                                     FINISHED: 2                 NORMAL_TASK
    	11  AutoscalingRequester.purge_expired_requests  FINISHED: 4                 ACTOR_TASK
    	12  _StatsActor.register_dataset                 FINISHED: 3                 ACTOR_TASK
    	13  _StatsActor.update_execution_metrics         FINISHED: 2                 ACTOR_TASK
    	14  HashShuffleAggregator.__init__               FINISHED: 40                ACTOR_CREATION_TASK
    	15  _StatsActor.__init__                         FINISHED: 1                 ACTOR_CREATION_TASK
    	16  JobSupervisor.__init__                       FINISHED: 3                 ACTOR_CREATION_TASK
    	17  AutoscalingRequester.__init__                FINISHED: 1                 ACTOR_CREATION_TASK
    

Head (cpu=1, mem=8Gi); 5 x Worker (cpu=1, mem=2Gi):
(This turns off the autoscaling, fixing the number of workers at 5.)

  • Works for n = 100
  • Same problem (tasks stuck in PENDING_NODE_ASSIGNMENT STATE) for n = 1000. Running ray summary tasks gives
    	======== Tasks Summary: 2025-05-18 23:32:44.801447 ========
    	Stats:
    	------------------------------------
    	total_actor_scheduled: 13
    	total_actor_tasks: 245
    	total_tasks: 2
    
    
    	Table (group by func_name):
    	------------------------------------
    			FUNC_OR_CLASS_NAME                           STATE_COUNTS                TYPE
    	0   JobSupervisor.ping                           FINISHED: 179               ACTOR_TASK
    	1   AutoscalingRequester.request_resources       FINISHED: 9                 ACTOR_TASK
    	2   _StatsActor.get_datasets                     FINISHED: 13                ACTOR_TASK
    	3   _StatsActor.update_metrics                   FINISHED: 35                ACTOR_TASK
    	4   AutoscalingRequester.purge_expired_requests  FINISHED: 1                 ACTOR_TASK
    	5   JobSupervisor.run                            RUNNING: 1                  ACTOR_TASK
    	6   _StatsActor.get_dataset_id                   FINISHED: 6                 ACTOR_TASK
    	7   _map_task                                    PENDING_NODE_ASSIGNMENT: 2  NORMAL_TASK
    	8   _StatsActor.register_dataset                 FINISHED: 1                 ACTOR_TASK
    	9   HashShuffleAggregator.__init__               FINISHED: 10                ACTOR_CREATION_TASK
    	10  JobSupervisor.__init__                       FINISHED: 1                 ACTOR_CREATION_TASK
    	11  _StatsActor.__init__                         FINISHED: 1                 ACTOR_CREATION_TASK
    	12  AutoscalingRequester.__init__                FINISHED: 1                 ACTOR_CREATION_TASK
    

Head (cpu=2, mem=8Gi); 5 x Worker (cpu=2, mem=4Gi):
(Same as the previous, but doubling the CPU and worker memory.)

  • Works for n = 2000.

  • For n = 3000 it got to the point where 2 tasks are Running (and 30 waiting for scheduling). However, the Running tasks don’t seem to finish or are quite slow (I left them for ~20 mins and they didn’t finish). Running ray summary tasks gives

    	======== Tasks Summary: 2025-05-18 23:41:12.662745 ========
    	Stats:
    	------------------------------------
    	total_actor_scheduled: 33
    	total_actor_tasks: 144
    	total_tasks: 81
    
    
    	Table (group by func_name):
    	------------------------------------
    			FUNC_OR_CLASS_NAME                      STATE_COUNTS                 TYPE
    	0   _map_task                               FINISHED: 41                 NORMAL_TASK
    	1   _shuffle_block                          FINISHED: 38                 NORMAL_TASK
    																							RUNNING: 2
    	2   _StatsActor.update_metrics              FINISHED: 6                  ACTOR_TASK
    	3   HashShuffleAggregator.submit            FINISHED: 78                 ACTOR_TASK
    																							PENDING_NODE_ASSIGNMENT: 20
    	4   _StatsActor.get_datasets                FINISHED: 4                  ACTOR_TASK
    	5   JobSupervisor.run                       RUNNING: 1                   ACTOR_TASK
    	6   JobSupervisor.ping                      FINISHED: 27                 ACTOR_TASK
    	7   _StatsActor.get_dataset_id              FINISHED: 6                  ACTOR_TASK
    	8   AutoscalingRequester.request_resources  FINISHED: 1                  ACTOR_TASK
    	9   _StatsActor.register_dataset            FINISHED: 1                  ACTOR_TASK
    	10  HashShuffleAggregator.__init__          FINISHED: 20                 ACTOR_CREATION_TASK
    																							PENDING_NODE_ASSIGNMENT: 10
    	11  JobSupervisor.__init__                  FINISHED: 1                  ACTOR_CREATION_TASK
    	12  AutoscalingRequester.__init__           FINISHED: 1                  ACTOR_CREATION_TASK
    	13  _StatsActor.__init__                    FINISHED: 1                  ACTOR_CREATION_TASK
    

    I also ran the actors summary for this one:

    	======== Actors Summary: 2025-05-18 23:54:34.085598 ========
    	Stats:
    	------------------------------------
    	total_actors: 33
    
    
    	Table (group by class):
    	------------------------------------
    			CLASS_NAME             STATE_COUNTS
    	0   HashShuffleAggregator  ALIVE: 20
    														 PENDING_CREATION: 10
    	1   JobSupervisor          ALIVE: 1
    	2   AutoscalingRequester   ALIVE: 1
    	3   _StatsActor            ALIVE: 1
    

    And I see an autoscaler warning (this is probably also appearing in the other cases):

    	(autoscaler +12m3s) Warning: The following resource request cannot be
    	scheduled right now: {'CPU': 0.125, 'memory': 939524096.0}. This is likely
    	due to all cluster resources being claimed by actors. Consider creating
    	fewer actors or adding more nodes to this Ray cluster.
    

    Because of the actors pending creation and the autoscaler warning, I updated the deployment to have 8 Worker pods (instead of 5) while the job was “stuck”, and then it was able to complete in short order.

I’ve done a few tests now where tasks get stuck in the pending state, and then I update the deployment to increase the number of worker pods, whereupon the job is able to complete.

So it does seem to be resource constraints causing the issue. However:

  • The Iris dataset is very small (150 rows, ~15Kib), so even replicating 3000 times is still a small dataset (450k rows, ~44Mib). It seems strange that this would require scaling my cluster?
  • Based on prior experience with the likes of Dask and Spark, with a smaller cluster I’d expect tasks to complete more slowly (as there aren’t so many nodes available to execute tasks in parallel), but still to complete nonetheless. Am I right in expecting the same from Ray? – in which case there’s still something to be debugged here, but we’re maybe a bit closer to understanding the problem.

Thank you Dennis! Great job debugging and running tests, :smiley: I think you’re definitely on the right track here.

The autoscaler log points to a resource-placement issue rather than a bug in
Dataset.join:

	scheduled right now: {'CPU': 0.125, 'memory': 939524096.0}. This is likely
	due to all cluster resources being claimed by actors. Consider creating
	fewer actors or adding more nodes to this Ray cluster.

Looks like a pure scheduling issue rather than a bug in the join. So I think the culprit here is likely HashShuffleAggregator, which is an actor and not a task. These are probably required by your join operation. The number of these actors is determined by the num_partitions parameter.

Each HashShuffleAggregator actor created by the join reserves ~0.9 GiB of Ray mem. With 2 GB worker pods only two aggregators can fit per node, so when the join tries to launch the 3rd+, they sit in PENDING_NODE_ASSIGNMENT and then job stalls.

I think if you lower the num_partitions it might run faster, maybe try a number that’s less than 4. Maybe start off with 1 if you think the Iris dataset is small enough.

Do you feel like you need a lot of partitions? It might be possible that the scheduling will work better with fewer given the size of the dataset you have. Or you can bump each worker to 4 - 8 GB RAM.

1 Like

Thanks Christina, I was coming to the same conclusion yesterday after more debugging and consulting with ChatGPT, but I’m glad to have confirmation from you!

So to summarise:

  • I should target fewer partitions (but beware of OOM errors with too-large partitions, which is what had driven me to more partitions in previous experiments).
  • I had thought (mistakenly) that more partitions would simply lead to inefficiencies (i.e., more tasks/actors to be run), but that the job would still finish (i.e., run n tasks/actors, then the next n, etc. until finished). However, this isn’t true for all operations (it seems to not be true for hash shuffling).

I’m happy to say that I’ve been able to get past my issue with tasks/actors stalling by following the above guidelines.


On a related note, ChatGPT told me the following. Are you able to confirm if the below is correct? (I don’t know what source this comes from – I haven’t seen it in the Ray docs so I fear hallucinations.)

Tasks vs Actors:

  - Ray tasks (stateless) can be queued — if no worker is available, Ray queues
    them and assigns them as workers free up.
  - Ray actors (stateful) cannot be queued in the same way — they must be placed
    immediately on a node that has the required resources, otherwise the actor
    creation request is marked as `PENDING_NODE_ASSIGNMENT`.

A join operation creates `HashShuffleAggregator` actors, and Ray’s actor placement
logic says:

  "I can't launch this actor unless I find a node that can host it *now*, with
   all the CPU/memory it needs."

Ray doesn’t currently implement a queue or pooling mechanism for actors. It either
places them immediately or waits for suitable capacity.

Shuffle Requires All-to-All Coordination:

During a join, Ray's shuffling strategy (especially with num_partitions = N)
launches N `HashShuffleAggregator` actors and N `shuffle_block` tasks.

Each actor represents a partition of the output, and each must be available
to receive its share of data from all mappers, meaning:
  - The entire actor mesh must exist for the shuffle to proceed.
  - If even one actor can’t be placed, the data can’t be fully routed and sent.

So it’s not a pipeline like:

  Partition 1 done → free resources → Partition 2 starts.

Instead, it’s:

  All N partitions need to exist → all data is shuffled → then aggregators
  finalize.

This is where the "all-at-once" pressure comes from.

Allo! Glad that it fixed your problem :smiley: You can read more about the actors and tasks here in the official docs if you would like.

This is also answered in the FAQ! You should take a read and see if it answeres your questions

There is a specific section for “What’s the difference between a worker and an actor?”.

Docs: