Hi, I’m currently new to Ray and I need help with my work. So what I’m doing is using Ray in my Tornado web server for parallel computing when there is a request. Here is my code when connecting to Ray cluster:
runtime_env = {
"working_dir": ".",
}
ray.init(address="auto", runtime_env = runtime_env)
job_matcher = JobMatcher.remote()
MAX_NUM_PENDING_TASKS = 3
JobMatcher
is an actor class called from another Python file for performing job similarity and job recommendation computation. In this case, my JobMatcher
actor class also calls some methods from a class in another Python file for job attribute matching purposes. Below is the code for receiving the data and processing it until I send the result:
class AllProcessHandler(BaseHandler):
def receive(self):
data = super().receive()
job_list = data.get('job_ads')
job_list_ref = ray.put(job_list)
self.similar_job_runner(job_list, job_list_ref)
def similar_job_runner(self, job_list: list[dict], job_list_ref):
start = time.time()
result_sim_job = []
for curr_job in job_list:
if len(result_sim_job) > MAX_NUM_PENDING_TASKS:
# update result_refs to only and track the remaining tasks.
ready_refs, result_sim_job = ray.wait(result_sim_job, num_returns=1)
predict = {"id": ray.get(ready_refs[0])[0],
"type": "similar_job",
"result": ray.get(ray.get(ready_refs[0])[1]),}
self.write(predict)
self.flush() # Send the buffered data immediately
result_sim_job.append(similar_job.remote(curr_job, job_list_ref))
while len(result_sim_job):
done_id, result_sim_job = ray.wait(result_sim_job)
predict = {"id": ray.get(done_id[0])[0],
"type": "similar_job",
"result": ray.get(ray.get(done_id[0])[1]),}
self.write(predict)
self.flush() # Send the buffered data immediately
@ray.remote
def similar_job(curr_job: dict, job_list: list[dict]) -> tuple[int, list[dict]]:
print("Similar Job Started")
job_id = curr_job['id']
sim_job = job_matcher.sim_job_ad4.remote(curr_job, job_list)
return job_id, sim_job
sim_job_ad4
is a class method from the JobMatcher
actor class that performs similarity computation using some methods in JobAttributeMatcher
class object initialized in the JobMatcher
actor class.
@ray.remote
class JobMatcher:
def __init__(self):
from job_attribute_matching import JobAttributeMatcher
self.att_matcher = JobAttributeMatcher()
Here I’m using 3 Ubuntu servers with 2 CPU cores each, Python 3.10.12, and Ray 2.9.2. I start a local Ray cluster using the ray start --head
command for the head node followed by ray start --address='172.26.13.122:6379'
on 2 other servers to add worker nodes to this cluster. Here is my ray status:
My problem is when I look at my server status using the htop
command, I only see 1 server that utilizes 2 CPU cores, and the remaining 4 CPUs (which are from 2 nodes) are not being utilized. The running process that uses 2 CPUs is not always on the head node; sometimes it runs on one of the two worker nodes too. Why can’t Ray utilize the other server cores to run this process even though similar_job
is already decorated as a remote function? I also tried passing num_cpus
and max_concurrency
in the JobMatcher
actor definition, but the behavior remains the same. Please correct me if I did something wrong in my implementation with Ray.
My other question is, why does every time I want to connect worker nodes to the cluster with the ray start --address='172.26.13.122:6379'
command, there is always a warning like: “global_state_accessor.cc:432: This node has an IP address of 172.26.2.36, but we cannot find a local Raylet with the same address. This can happen when you connect to the Ray cluster with a different IP address or when connecting to a container.”
I found the solution here in the FAQ in the Ray documentation that states: “The cause of this error may be the head node overloading with too many simultaneous connections. The solution for this problem is to start the worker nodes more slowly.”
What does ‘start the worker nodes more slowly’ mean? I already tried starting the head node using ray start --head --autoscaling-config=~/ray_bootstrap_config.yaml
command and adjusting the upscaling_speed
parameter to 0.5 or 0.2, and it still gives me that kind of warning. Can anyone help me with that?
Thanks.