Configuring Ray - NUMA Domain

Hello all,

I am currently working on optimizing the configuration of a Ray cluster for running large sets of remote-tasks while leveraging NUMA domains for enhance performance. My goal is to segment each node I acquired into 4 distinct NUMA domains in order to improve memory access and overall computation efficiency. Each node, for my use-case, has 96 cores and total of 4 NUMA domains (24 cores per NUMA ).

My current setup involves me setting numactl before calling ray start and properly pinning each worker-node to correct numa domain. I followed this post for guidance. This approach works for me, however, my questions is:

Does Ray have a way to restrict work (remote tasks) to run on specific NUMA domains without need to set numactl at start? If so , how can I achieve that and verify it is actually doing the right thing?

Thank you in advance! Any examples would be greatly appreciated !

For reference I am using:

  • Ray 2.8.1
  • CentOS 7

Hi @max_ronda so far I don’t think we did anything for numa support in ray. :frowning:

One possible thing you can try maybe is to start 4 containers and each one of them run on single NUMA domain? You can add label for each one and use the label to schedule the tasks on the specific numa domain.

Hi @yic , Thanks for the reply !

I am currently running on an HPC center , no containers. What I am doing, and please tell me if this is wrong, is starting a cluster this way:

My resources: Node has 96 cores , 4 NUMA domains (24 cores each) . I split my node into 4 parts this way:

Head node:

$ numactl -N 0 --membind=0 ray start --head --port=6379 --num-cpus=24 ... --block

Worker node 1 (Within same node)

$ numactl -N 1 --membind=1 ray start --address=HEAD_NODE_IP:6379 --num-cpus=24 ... --block

Worker node 2 (Within same node)

$ numactl -N 2 --membind=2 ray start --address=HEAD_NODE_IP:6379 --num-cpus=24 ... --block

Worker node 3 (Within same node)

numactl -N 3 --membind=3 ray start --address=HEAD_NODE_IP:6379 --num-cpus=24 ... --block

After, when I connect this cluster via ray.init() and running ray.nodes(), ray tells me I have acquired 4 nodes with total of 96 cores.

Then when running a simple task like this …

@ray.remote(num_cpus=24)
def run_task(task_id):

    # dummy wait here ... 
    _ = do_work(seconds=90) 
    
    affinity = list(os.sched_getaffinity(0))
    
    return affinity 

futures = [run_task.remote(i) for i in range(4)]
results = ray.get(futures)

It returns correct affinity for each task I submit .

I am thinking about scaling this solution to many more nodes, but curious if this is the right way of doing it or I should be thinking of something else ?

Thanks for your time and suggestions!

Hey @yic , any thoughts from above ?

Hi @max_ronda this should work.

You might want to setup different tmp folders for different ray worker.

Btw, copy between the ray worker will happen for object ref between raylets. Logically, you should think each ray start as a new node.

Hi @yic , I don’t think I can set --temp-dir for subsequent workers . I get this warning:

`--temp-dir=/location/here/worker2` option will be ignored. `--head` is a required flag to use `--temp-dir`. temp_dir is only configurable from a head node. All the worker nodes will use the same temp_dir as a head node. 

Also , can you elaborate a bit more on your last statement? Do you have any other ideas in terms of how I can setup this NUMA-Domain-aware cluster? That would be greatly appreciated! Thanks!

@max_ronda Without setting temp dir might be ok. Not sure whether there is any issues in production workload. If anything bad happended, please create a ticket in github for this. We do have a lot unit tests starting multiple raylet on the same node to test the distributed features.

can you elaborate a bit more on your last statement

When you call ray start multiple times, it’ll just create several ray nodes. You can observe it from the dashboard.

Each ray node has an object store. For the ray workers on the same ray node, when they access the data, there is no copy. It’s done with the shared memory. But for workers on different ray nodes, the data needs to be copied to the local object store before being used.

For you cases, although physically, you have ray nodes on the same physical machine, ray still think they are on different physical machines and thus the copy will happen.

Do you have any other ideas in terms of how I can setup this NUMA-Domain-aware cluster?

I just thought of another idea. But I think it needs some development work from your side. Maybe 200LOC. You can also open source after implementing it if you want.

The idea is to use runtime environment plugin. This allow you to specify how to start the worker in some customized way. Here is one example for nsight profiling: ray/python/ray/_private/runtime_env/nsight.py at master · ray-project/ray · GitHub

The documentation for the API Profiling — Ray 2.9.3

For your usage, you can implement a runtime env plugin and load it when system started. So in the end you probably can do:

@ray.remote(runtime_env={"numa_group": 1}
def f():
   pass

And the remote function or actor will run on the specified numa group.

Let me know if you are interested in contributing this one.

Ohh so not really recommended to start multiple worker-nodes within same nodes then ? Might be a bottleneck in the future …

I like the idea of having control via an API! However, would this achieve my initial goal of running highly parallelizable tasks without thinking about their placement? I’d like Ray to handle scheduling, so that as soon as a NUMA Domain becomes available, Ray can schedule next task within that set of cores.

My question is, would this method allow me to break down my cluster into NUMA Domains both for worker-nodes and head-node? Essentially, would I be able to do this with runtime_env plugin ?

ray.init(address='ray://host:port', runtime_env={"numa_group": ["0-23", "24-47", "48-71", "72-95"})

@ray.remote(num_cpus=24)
def run_task(task_id):
    # Task here gets scheduled within available numa_group 
    pass

futures = [run_task.remote(i) for i in range(16)]
results = ray.get(futures)

I am happy to contribute to this! Any other plugins I can use for inspiration ?

Looking forward to your response!

Ohh so not really recommended to start multiple worker-nodes within same nodes then ? Might be a bottleneck in the future …

I think it depends on your workload. If you don’t have this pattern, it’s going to be OK.

My question is, would this method allow me to break down my cluster into NUMA Domains both for worker-nodes and head-node? Essentially, would I be able to do this with runtime_env plugin ?

I think it could but it’s more complicated than my original expectation.

There are two things here, 1) local numa groups which is not visible to ray and can be visible to runtime env plugin. 2) the scheduling informations like placement groups, cpus, gpus

Since 1) is not visible to ray, ray can’t use this information for scheduling.

To achieve this, ray should support it just like multiple GPUs on the same node. It could be multiple CPUs on the nodes (each numa group is a cpu, maybe??). This is very complicated IMO.

Another way is to do it in application layer, runtime env plugin. But this is more complicated than my original thought.

You can do it in this way, uses a customer resource when call ray start ray start --resources '{"numa_groups": 4}'

Later when you schedule you should use resources={"numa_groups": 1} for remote tasks or actors.

But ray will not launch the worker on the specified numa group. For this case, you need the runtime env plugin.

Inside the plugin, you need to have a script to calculate which numa group is free. Maybe you can have a file locally on the node recording which numa node has been using. For example, when runtime env plugin started a numa proc, it’ll write the pid into the local file. One brief workflow for this plugin is like:

  • Check the local pid - numa mapping file. Check which PID is alive and which is dead and get the information about which NUMA group is available.
  • Pick the free numa group.
  • Update the mapping file
  • Run the program with the right cmd.

Would this be an interesting feature for you guys to develop? I know I am keen to use it :smiley:

It seems reasonable but how would Ray select which NUMA group to schedule to, will it check with the plugin to see where to schedule next? I am also planning to run this on many nodes (maybe with autoscaler ON) so not sure how this will behave when scaled up…

Any other ideas? Is there anyone else who has run into this ?

Thanks!

Hi @max_ronda sorry for the late reply.

Would this be an interesting feature for you guys to develop? I know I am keen to use it :smiley:

Recently we are focusing on cleanup the issues and stability. Can you submit a ticket for this? We can wait for the community’s requirement to see the priority of this.

how would Ray select which NUMA group to schedule to, will it check with the plugin to see where to schedule next

Ray doesn’t select, it’s in your runtime env plugin code where it’s managed. Ray will only pick the node which has the free NUMA resource to schedule the task. But it won’t pick which numa group to run it. Runtime env plugin will decide because it control how to run the process.