Ray cluster didn't use all the available CPU nodes

Hi, I’m currently new to Ray and I need help with my work. So what I’m doing is using Ray in my Tornado web server for parallel computing when there is a request. Here is my code when connecting to Ray cluster:

runtime_env = {
    "working_dir": ".",
}
ray.init(address="auto", runtime_env = runtime_env)
job_matcher = JobMatcher.remote()
MAX_NUM_PENDING_TASKS = 3

JobMatcher is an actor class called from another Python file for performing job similarity and job recommendation computation. In this case, my JobMatcher actor class also calls some methods from a class in another Python file for job attribute matching purposes. Below is the code for receiving the data and processing it until I send the result:

class AllProcessHandler(BaseHandler):
    def receive(self):
        data = super().receive()

        job_list = data.get('job_ads')
        job_list_ref = ray.put(job_list)
        self.similar_job_runner(job_list, job_list_ref)

    def similar_job_runner(self, job_list: list[dict], job_list_ref):
        start = time.time()

        result_sim_job = []
        for curr_job in job_list:
            if len(result_sim_job) > MAX_NUM_PENDING_TASKS:
                # update result_refs to only and track the remaining tasks.
                ready_refs, result_sim_job = ray.wait(result_sim_job, num_returns=1)
                predict = {"id": ray.get(ready_refs[0])[0],
                            "type": "similar_job",
                            "result": ray.get(ray.get(ready_refs[0])[1]),}

                self.write(predict)
                self.flush()  # Send the buffered data immediately
            result_sim_job.append(similar_job.remote(curr_job, job_list_ref))
        
        while len(result_sim_job):
            done_id, result_sim_job = ray.wait(result_sim_job)
            predict = {"id": ray.get(done_id[0])[0],
                        "type": "similar_job",
                        "result": ray.get(ray.get(done_id[0])[1]),}

            self.write(predict)
            self.flush()  # Send the buffered data immediately

@ray.remote
def similar_job(curr_job: dict, job_list: list[dict]) -> tuple[int, list[dict]]:
    print("Similar Job Started")
    job_id = curr_job['id']
    sim_job = job_matcher.sim_job_ad4.remote(curr_job, job_list)
    return job_id, sim_job

sim_job_ad4 is a class method from the JobMatcher actor class that performs similarity computation using some methods in JobAttributeMatcher class object initialized in the JobMatcher actor class.

@ray.remote
class JobMatcher:

    def __init__(self):        
        from job_attribute_matching import JobAttributeMatcher
        self.att_matcher = JobAttributeMatcher()

Here I’m using 3 Ubuntu servers with 2 CPU cores each, Python 3.10.12, and Ray 2.9.2. I start a local Ray cluster using the ray start --head command for the head node followed by ray start --address='172.26.13.122:6379' on 2 other servers to add worker nodes to this cluster. Here is my ray status:

My problem is when I look at my server status using the htop command, I only see 1 server that utilizes 2 CPU cores, and the remaining 4 CPUs (which are from 2 nodes) are not being utilized. The running process that uses 2 CPUs is not always on the head node; sometimes it runs on one of the two worker nodes too. Why can’t Ray utilize the other server cores to run this process even though similar_job is already decorated as a remote function? I also tried passing num_cpus and max_concurrency in the JobMatcher actor definition, but the behavior remains the same. Please correct me if I did something wrong in my implementation with Ray.

My other question is, why does every time I want to connect worker nodes to the cluster with the ray start --address='172.26.13.122:6379' command, there is always a warning like: “global_state_accessor.cc:432: This node has an IP address of 172.26.2.36, but we cannot find a local Raylet with the same address. This can happen when you connect to the Ray cluster with a different IP address or when connecting to a container.”

I found the solution here in the FAQ in the Ray documentation that states: “The cause of this error may be the head node overloading with too many simultaneous connections. The solution for this problem is to start the worker nodes more slowly.”

What does ‘start the worker nodes more slowly’ mean? I already tried starting the head node using ray start --head --autoscaling-config=~/ray_bootstrap_config.yaml command and adjusting the upscaling_speed parameter to 0.5 or 0.2, and it still gives me that kind of warning. Can anyone help me with that?

Thanks.

Ahh my bad, changing the actor class back to the normal one solved the issue.
But can someone help me with the second question? I got that warning but everything works fine :confused: