How to run a function exactly once on each node?

Say I have a function copy_data that copies data from an outside source (e.g., s3, gs, database) to a VM, and I want to run that function exactly once on each node in a cluster. Is there a good way to do that?

If I have, say, 5 nodes, and each node has 32 cores, I’ve been decorating copy_data with @ray.remote(num_cpus=32) and using ray to run it 5 times, but I thought there might be a better way.

If it helps, copying data is just one application for this functionality.


Hi @dutch,

Check out this recent post for one way to address this problem.

1 Like

Thanks @mannyv!! On top of that, what you can do is to use the placement group with a STRICT_SPREAD strategy, and schedule actors on all nodes that can perform tasks you’d like to perform.

Thanks, guys. I’m looking at the documentation. If I have a function get_ip_addr that I want to run once on each node, do I do something like the following:

bundles = [{"CPU": 16} for _ in ray.nodes()]
pg = placement_group(bundles=bundles, strategy="STRICT_SPREAD")
tasks = [
    get_ip_addr.options(placement_group=pg, placement_group_bundle_index=i).remote()
    for i in range(len(ray.nodes()))
ips = ray.get(tasks)

My recommendation is

class FunctionExecutor
def get_ip_addr(self):
    return "haha" # write your code

num_nodes = len(ray.nodes())
bundles = [{"CPU": 1} for _ in num_nodes]
pg = placement_group(bundles=bundles, strategy="STRICT_SPREAD")
executors = [FunctionExecutor.options(placement_group=pg).remote() for num_nodes]
ips = ray.get([executor.get_ip_addr.remote() for executor in executors])

Note that the placement group pre-reserves resources, so if you allocate 16 cpus for each bundle, it will probably take up the whole resources in every machine in your cluster (so you cannot execute other functions). If you are only looking for executing one functions with the placement group, you can use your approach

1 Like