Parallelize function across nodes but never within nodes

Say I have M nodes and function that I want to run N > 0 times. What’s the best approach for running the function in parallel across nodes but never within nodes?

I can’t decorate the function with a cpu count, because I don’t know how big the nodes will be, so placement groups with STRICT_SPREAD seem like a good strategy here. I did some prototyping, and it seems like the length of the bundles in a placement group can’t be greater than the number of nodes, or I’ll get an autoscaler error.

My hacky approach now is to loop through chunks of bundles, M (= number of nodes) items at a time, create a placement group with the chunk of bundles, and run the tasks, but there has to be something easier.

Thanks

This is a nice use case! @jjyao any thoughts here?

Not sure if it helps, but here’s my (not good) approach. Edit: cleaned it up a bit.

from collections.abc import Iterable
from typing import Any, Dict, List, Sequence
import ray
from ray.util.placement_group import placement_group, remove_placement_group

def strict_spread(
    func: Callable,
    func_kwargs: Sequence[Dict[str, Any]],
    num_cpu: int = 1,
    num_gpu: int = 0,
    autoscale: bool = False
) -> List[Any]:

    bundles_all = [{"CPU": num_cpu, "GPU": num_gpu} for _ in range(len(func_kwargs))]

    num_chunks = 1 if autoscale else len(ray.nodes())
    pg_bundles = [bundles_all[i:i + num_chunks] for i in range(0, len(bundles_all), num_chunks)]
    pg_kwargs = [func_kwargs[i:i + num_chunks] for i in range(0, len(bundles_all), num_chunks)]

    output = []
    for pbundles, pkwargs in zip(pg_bundles, pg_kwargs):
        pg = placement_group(bundles=pbundles, strategy="STRICT_SPREAD")
        ray.get(pg.ready())
        tasks = [
            func.options(
                placement_group=pg, placement_group_bundle_index=i
            ).remote(**kwargs)
            for i, kwargs in enumerate(pkwargs)
        ]

        tasks_output: Any = ray.get(tasks)
        if isinstance(tasks_output, Iterable):
            output.extend(tasks_output)
        else:
            output.append(tasks_output)

        remove_placement_group(pg)

    return output

Another idea stemming from the CPU count is to start all your nodes with a custom resource (e.g. {"custom_resource_name": 1}) and then have the function request that same resource.

The main difference from the provided approach is that each node will start processing a new task upon completion of its current task, as opposed to waiting for all nodes to finish.

1 Like

@matthewdeng That worked. Thanks! Way better than my approach.