Ray only using one CPU core but detects all resources

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am running Ray on a SLURM cluster and followed the documentation when writing my sbatch script and haven’t had any issues until today. The essence of my code is to take a large input split it up into N parts (1 part for each CPU core) and use Ray to have each part executed on a single CPU core. The function which does the processing is decorated with num_cpus = 1 and the node I’m working on has 128 CPU cores.

When I execute the program and view the Ray dashboard, I can see 128 workers being created, but then all of the work is being done by a single worker. The Resource Status on the dashboard also shows 1.0/128.0 CPUs being used.

I just updated Ray to version 2.5.1. Earlier today I was on 2.2.0 and the issue I was facing was similar (only 2 workers were initially created and 1 did all the work).

It could also be an issue with my SLURM cluster (not sure what I should be looking for to verify that), but I thought I’d start here given the community size! Thanks in advance!

EDIT: To get some more info, I looked at the stack trace for the workers on the dashboard and for the ones that weren’t executing anything they look like this

Thread 756850 (idle): "MainThread"
    pthread_cond_wait@@GLIBC_2.3.2 (libpthread-2.28.so)
    boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
    boost::asio::detail::scheduler::run (ray/_raylet.so)
    boost::asio::io_context::run (ray/_raylet.so)
    ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
    run_task_loop (ray/_raylet.so)
    main_loop (ray/_private/worker.py:861)
    <module> (ray/_private/workers/default_worker.py:262)

whereas the one worker that is doing work’s stack trace looks like this:

Thread 756949 (idle): "MainThread"
    pthread_cond_wait@@GLIBC_2.3.2 (libpthread-2.28.so)
    boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
    boost::asio::detail::scheduler::run (ray/_raylet.so)
    boost::asio::io_context::run (ray/_raylet.so)
    ray::core::CoreWorker::RunTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop (ray/_raylet.so)
    ray::core::CoreWorkerProcess::RunTaskExecutionLoop (ray/_raylet.so)
    run_task_loop (ray/_raylet.so)
    main_loop (ray/_private/worker.py:861)
    <module> (ray/_private/workers/default_worker.py:262)
Thread 763971 (idle): "ray_import_thread"
    do_futex_wait (libpthread-2.28.so)
    __new_sem_wait_slow (libpthread-2.28.so)
    PyThread_acquire_lock_timed (python3.9)
    lock_PyThread_acquire_lock (python3.9)
    wait (threading.py:316)
    _wait_once (grpc/_common.py:106)
    wait (grpc/_common.py:148)
    result (grpc/_channel.py:733)
    _poll_locked (ray/_private/gcs_pubsub.py:217)
    poll (ray/_private/gcs_pubsub.py:372)
    _run (ray/_private/import_thread.py:74)
    run (threading.py:917)
    _bootstrap_inner (threading.py:980)
    _bootstrap (threading.py:937)
    clone (libc-2.28.so)
Thread 772856 (idle): "Thread-20"
    epoll_wait (libc-2.28.so)
    0x1512bff09fda (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    0x1512bffb184c (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    0x1512c0014a55 (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    0x1512c008f12f (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    0x1512c0090a5d (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    channel_spin (grpc/_channel.py:1258)
    0x1512bfff9dec (grpc/_cython/cygrpc.cpython-39-x86_64-linux-gnu.so)
    run (threading.py:917)
    _bootstrap_inner (threading.py:980)
    _bootstrap (threading.py:937)
    clone (libc-2.28.so)

@adityatv

Could you paste the output of ray status? Also it would be great if you can have a simple repro script for me to test on my side.

Hey @jjyao. I get the following for ray status:

======== Autoscaler status: 2023-07-17 00:40:36.816565 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_436c87ab53d00069b3bbba3fbb94229a507dfbc2b36476353918532f
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/128.0 CPU # sometimes this will be 1.0/128.0 CPU, but most of the time not
 0B/781.59GiB memory
 0B/186.26GiB object_store_memory

Demands:
 (no resource demands)

Unfortunately, I was unable to come up with a repro script (I’ll keep trying), but the template for my Ray code is the following:

import time
from typing import List
import numpy as np
import ray
import os
from datetime import datetime

@ray.remote
class SumActor:
    def __init__(self):
      self.sum = 0

    async def update(self, num):
        self.sum += num
    
class ReproClass:
    def __init__(self):
        self.dummy_text = "Hello World"
        self.nums = np.arange(1e4, 1e5)

    def func1(self, num_workers: int, random_nums: List[int]):
        s = datetime.now()
        sum_actor = SumActor.remote()
        ray.get([
            self.func2.remote(self, arr, sum_actor)
            for arr in np.array_split(random_nums, num_workers)
        ])
        print(f"{self.dummy_text} after {datetime.now() - s} seconds")

    @ray.remote(scheduling_strategy="SPREAD", num_cpus=1)
    def func2(self, arr: List[int], sum_actor):
        ray.get([
            sum_actor.update.remote(self.func3(num)) for num in arr
        ])
        return 0
    
    def func3(self, num):
        if num in self.nums:
            time.sleep(num)
        return num


def main() -> None:
    if ray.is_initialized() == False:
        if "redis_password" in os.environ:
            ray.init(
                address="auto",
                _redis_password=os.environ["redis_password"],
                include_dashboard=True,
            )
        else:
            ray.init(include_dashboard=True)

    nnodes = 1
    ncpus = 128

    repro = ReproClass()

    repro.func1(nnodes * ncpus, np.random.randint(1, 1e6, 128 * 100))

    ray.shutdown()


if __name__ == "__main__":
    main()

The main difference is that my functions take in more parameters (e.g. primitive types or numpy arrays, defaultdicts). Also, the above code does what I wish for which is it uses 128/128 CPUs.

@jjyao I don’t have a repro script, but I think the issue I am facing has to do with the fact that I have a class method as a Ray Task when the class itself is not a Ray Actor. Is this feature not supported? Do you have any suggestions on reformating a class where I want some of the functionality to be parallelized but not the rest? Thanks in advance!

I was able to get it to use all the cores available by moving the ray task outside the class and then calling the function from within the class!