How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty in completing my task, but I can work around it.
I am using Ray 2.2.0 to parallelize a task run on a sizeable networkX graph (millions of nodes and edges). The following is an outline of the relevant parts of my code
# main.py
@ray.remote(scheduling_strategy="SPREAD")
def run_algo_dist(G_creator:Callable[[], nx.Graph], ...)
run_algo_serial(G_creator(), ...)
#
def run_algo_serial(G: nx.Graph, ...)
algo_executor = AlgoExecutor(G) # creates an object to executor algorithm
algo_executor.start(....) # Takes in primitive arguments
return algo_executor
#algo_executor.py
class AlgoExecutor:
def __init__(self, G: nx.Graph, ...):
self.G = G
self.result = defaultdict(lambda: defaultdict(int))
def start(...):
interested_nodes_split = np.array_split(interested_nodes, N) # Split nodes into smaller chunks where N is in the 100s
interested_nodes_ref = ray.put(interested_nodes)
generator_refs = [
self.run_algo_on_list_of_nodes.remote(self, ray.put(arr), interested_nodes_ref)
]
del interested_nodes_ref
# Iterate through the different generators and get the yielded result to add to the self.result member
@ray.remote(scheduling_strategy="SPREAD", memory=3 * 1024 * 1024 * 1024)
def run_algo_on_list_of_nodes(self, nodes: np.ndarray, all_interested_nodes: np.ndarray, ...):
for node in nodes:
yield (node, self.run_algo_on_single_node(node, all_interested_nodes, ...)
def run_algo_on_single_node(self, node: int, all_interested_nodes: np.ndarray, ...):
# Run a pretty intensive algorithm here.
# The algorithm makes use of all the passed-in parameters and repeatedly gets the neighbors of the nodes from self.G
# returns a Dict[int, int]
Following the Ray documentation I have made some of the existing optimizations such as using generators to return results and passing in a lambda function to create large arguments rather than passing in the full argument and while it resulted in some improvement, I still run into OOM and object spilling at times.
Looking at the ray memory
output, I notice that most of the memory usage (outside of the lambda functions for the run_algo_dist
function which is a couple of GB for each call) comes from (deserialize task arg) algo_executor.run_algo_on_list_of_nodes
.
Any tips for reducing the chance of OOM would be appreciated! Thanks in advance!
Also, a slightly tangential question but how are local variables stored (if at all) by Ray? Do variables created in run_algo_on_single_node
get treated differently than run_algo_on_list_of_nodes
?