Memory optimization when parallelizing with networkX graph

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty in completing my task, but I can work around it.

I am using Ray 2.2.0 to parallelize a task run on a sizeable networkX graph (millions of nodes and edges). The following is an outline of the relevant parts of my code

def run_algo_dist(G_creator:Callable[[], nx.Graph], ...)
    run_algo_serial(G_creator(), ...)
def run_algo_serial(G: nx.Graph, ...)
    algo_executor = AlgoExecutor(G) # creates an object to executor algorithm
    algo_executor.start(....) # Takes in primitive arguments
    return algo_executor
class AlgoExecutor:

     def __init__(self, G: nx.Graph, ...):
           self.G = G
           self.result = defaultdict(lambda: defaultdict(int))

     def start(...):
           interested_nodes_split = np.array_split(interested_nodes, N) # Split nodes into smaller chunks where N is in the 100s
           interested_nodes_ref = ray.put(interested_nodes)
           generator_refs = [
                     self.run_algo_on_list_of_nodes.remote(self, ray.put(arr), interested_nodes_ref)
           del interested_nodes_ref
           # Iterate through the different generators and get the yielded result to add to the self.result member

     @ray.remote(scheduling_strategy="SPREAD", memory=3 * 1024 * 1024 * 1024)
     def run_algo_on_list_of_nodes(self, nodes: np.ndarray, all_interested_nodes: np.ndarray, ...):
           for node in nodes:
                 yield (node, self.run_algo_on_single_node(node, all_interested_nodes, ...)

     def run_algo_on_single_node(self, node: int, all_interested_nodes: np.ndarray, ...):
           # Run a pretty intensive algorithm here.
           # The algorithm makes use of all the passed-in parameters and repeatedly gets the neighbors of the nodes from self.G
           # returns a Dict[int, int]

Following the Ray documentation I have made some of the existing optimizations such as using generators to return results and passing in a lambda function to create large arguments rather than passing in the full argument and while it resulted in some improvement, I still run into OOM and object spilling at times.

Looking at the ray memory output, I notice that most of the memory usage (outside of the lambda functions for the run_algo_dist function which is a couple of GB for each call) comes from (deserialize task arg) algo_executor.run_algo_on_list_of_nodes.

Any tips for reducing the chance of OOM would be appreciated! Thanks in advance!

Also, a slightly tangential question but how are local variables stored (if at all) by Ray? Do variables created in run_algo_on_single_node get treated differently than run_algo_on_list_of_nodes?

@adityatv you are calling a remote function with a bunch of data, so the memory usage is expected.

You chucked the data, but you pass them all to a remote function. Why not pass them to multiple ones? Otherwise, I don’t think the chunk is useful.

If you call py functions, ray don’t do anything special, it all belongs to python. If you call remote functions, ray will take care of the parameters and distribute the data as needed.

I see. So would you suggest evaluating the chunks in batches (i.e. 10 chunks at a time and then another 10 and so forth)?

Yea if the memory is a concern to you.