Parallelization of Graph algorithm on Ray Cluster + SLURM

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

Hi! I know the question I am asking is somewhat vague, but I’m hoping to get some input.

I have a graph algorithm that is executed for every node in a large (more than a million nodes) NetworkX graph. I would like to distribute the nodes across workers and execute the algorithm in parallel. I wrote up an initial version of the code where I make use of ray.remote and while I saw a speedup in execution, it does not scale appropriately. For example, running with 8 nodes takes almost the same amount of time (if not more) than running with 4 nodes.

Furthermore, I am experiencing object spilling which I would like to avoid. I’ve tried deleting references and doing ray.get in batches (now using ray.wait).

Currently, I am aiming for 1 worker process for each CPU I have access to and having each worker handle a list of nodes.

Context:

I am running this on SLURM on multiple nodes (each with 128 CPUs (2 sockets, 64 cores, 1 thread) and 250 GB memory). I am using Python: 3.9.13 and Ray: 2.0.1. The graph is constructed from torch_geometric.datasets

Skeleton code

# Necessary imports

class GraphAlgorithmExecutor:
    def __init__(self, G: nx.Graph, ...) -> None:
        self.G = G
        
        self.nodes = list(G.nodes())
        self.entity_nodes = # Some subset of nodes of G

        self.result = # Final result of algorithm

        # More initialization...

    def start(self, num_workers) -> None:
        s = time.time()
                
        futures = [self.algorithm_starting_at_node_list.remote(self, arr) for arr in np.array_split(self.nodes, num_workers)]


        while futures:
            num_returns = 10 if len(futures) >= 10 else len(futures)
            visited_ref, futures = ray.wait(futures, num_returns=num_returns)
            visited = ray.get(visited_ref)
            
            # Concatenate results from visited into self.result
		
            del visited_ref

        print("\tAlgorithm took {:.3f} seconds\n".format(time.time() - s))
        
    @ray.remote(scheduling_strategy="SPREAD", num_cpus=1)
    def algorithm_starting_at_node_list(self, nodes: np.ndarray) -> List[Dict[int, int]]:
        res = []
        for node in nodes:
            res.append(self.algorithm_starting_at_node(node))
        return res

    def algorithm_starting_at_node(self, node: int) -> Dict[int, int]:
        # time-intensive algorithm which accesses class member fields (i.e. self.G, self.nodes) 
        # and returns a dictionary
        pass

Any feedback is appreciated and more context can be provided if needed! Thanks in advance!

If you want each node has one task, maybe you can try node affinity

I would like to split the workload up so that each CPU has its own task (where the task is the algorithm_starting_at_node_list function).

I did try the node_affinity scheduling that you suggested and it seemed to allocate all the work to the head node (at least when I look at the output for ray.timeline() I can only see one process/node).

I may also be misunderstanding something fundamental. My understanding is that I initialize the 1 head node and (N - 1) worker nodes. I would then split the workload (or in my case the array of graph nodes) into N * 128 subarrays with the intent that each subarray would be handled by 1 CPU (among the N*128 that I have). I use the SPREAD strategy to spread the tasks among my N nodes (rather than putting all the work on the head node which I experienced with the node affinity and DEFAULT strategy) and specified num_cpus to be 1 for the task to be handled by 1 CPU.

Does this make sense or am I missing something in my understanding? Thanks in advance!

@adityatv you need to give the node id.

ray.NodeID.from_hex(ray.nodes()[0]["NodeID"])

ray.nodes() will give you all the nodes in the cluster and with this you can specify where to put the tasks.

Use node soft=False to enforce this.

I replaced my remote call with the following (and removed the SPREAD scheduling, keeping num_cpus=1):

futures = [
            self. algorithm_starting_at_node_list.options(
                scheduling_strategy=NodeAffinitySchedulingStrategy(
                    node_id = ray.NodeID.from_hex(ray.nodes()[i // num_workers]["NodeID"]),
                    soft = False,
                )
            ).remote(self, arr) for i, arr in enumerate(np.array_split(self. nodes, num_workers))
        ]

In doing so, I received the following error

e[2me[33m(raylet, ip=10.1.1.25)e[0m *** SIGBUS received at time=1668727608 on cpu 73 ***
e[2me[33m(raylet, ip=10.1.1.25)e[0m [low_level_alloc.cc : 570] RAW: mmap error: 12
e[2me[33m(raylet, ip=10.1.1.25)e[0m [failure_signal_handler.cc : 331] RAW: Signal 6 raised at PC=0x148ff3253a9f while already in AbslFailureSignalHandler()

In the code, I am trying to split up the array in N * 128 subarrays and then allocate batches of 128 to each of the N nodes (with the i // num_worker expression).

Also, before I used the SPREAD scheduling, I came across a similar (but not exact) error where the node would be marked dead due to the detector missing too many heartbeats from the node. Like the following:

The node with node id: c1d53c39e2640406c74ce5a8a0533a0c9d156a58e68164d600183f0b and address: 10.1.1.25 and node name: 10.1.1.25 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a 	(1) raylet crashes unexpectedly (OOM, preempted node, etc.)

Thanks again!

Yeah, it seems a bug. Do you mind checking the raylet log? It’s in /tmp/ray/session_latest/logs/raylet.out If it crashed due to any reason it sounds like a bug and we should get an issue and fix it.

Deleted the initial reply and reposted an updated version since I couldn’t find how to edit it.

I found the raylet.out files that you mentioned for both the erroring case and the SPREAD case.

In the SPREAD test, the head node creates 20 worker processes, and each of the remaining 7 worker nodes creates 14 worker processes. So, while the number of worker processes is less than I hoped (I would like 1 per CPU I have access to ideally), the scheduler did the job by spreading the workaround.

In the erroring test, the worker processes were not evenly created. the head node creates 1 worker process, one worker node creates 129 processes, most worker nodes don’t create any, another creates 37, etc.

In the raylet.out, the final message in the failing head node is:

[2022-11-17 17:43:08,964 I 2709867 2709867] (raylet) node_manager.cc:1429: NodeManager::DisconnectClient, disconnect_type=3, has creation task exception = 0
[2022-11-17 17:43:08,965 I 2709867 2709867] (raylet) node_manager.cc:1535: Driver (pid=2710032) is disconnected. job_id: 01000000
[2022-11-17 17:43:08,966 I 2709867 2709867] (raylet) node_manager.cc:599: New job has started. Job id 01000000 Driver pid 2710032 is dead: 1 driver address: 10.1.1.24
[2022-11-17 17:43:08,966 I 2709867 2709867] (raylet) worker_pool.cc:636: Job 01000000 already started in worker pool.
[2022-11-17 17:43:08,999 I 2709867 2709867] (raylet) node_manager.cc:1429: NodeManager::DisconnectClient, disconnect_type=1, has creation task exception = 0

My assumption was that by default ray would create one worker process for each core/CPU that it has access to (which is 128 per node), but that doesn’t seem to be the case.

Any help would be appreciated! This is starting to become more of an issue (Low → Medium). Thanks in advance!

I am still looking for a resolution or any insight. Any help is appreciated! Thanks in advance!