How severe does this issue affect your experience of using Ray?
- Low: It annoys or frustrates me for a moment.
Hi! I know the question I am asking is somewhat vague, but I’m hoping to get some input.
I have a graph algorithm that is executed for every node in a large (more than a million nodes) NetworkX graph. I would like to distribute the nodes across workers and execute the algorithm in parallel. I wrote up an initial version of the code where I make use of ray.remote
and while I saw a speedup in execution, it does not scale appropriately. For example, running with 8 nodes takes almost the same amount of time (if not more) than running with 4 nodes.
Furthermore, I am experiencing object spilling which I would like to avoid. I’ve tried deleting references and doing ray.get
in batches (now using ray.wait
).
Currently, I am aiming for 1 worker process for each CPU I have access to and having each worker handle a list of nodes.
Context:
I am running this on SLURM on multiple nodes (each with 128 CPUs (2 sockets, 64 cores, 1 thread) and 250 GB memory). I am using Python: 3.9.13 and Ray: 2.0.1. The graph is constructed from torch_geometric.datasets
Skeleton code
# Necessary imports
class GraphAlgorithmExecutor:
def __init__(self, G: nx.Graph, ...) -> None:
self.G = G
self.nodes = list(G.nodes())
self.entity_nodes = # Some subset of nodes of G
self.result = # Final result of algorithm
# More initialization...
def start(self, num_workers) -> None:
s = time.time()
futures = [self.algorithm_starting_at_node_list.remote(self, arr) for arr in np.array_split(self.nodes, num_workers)]
while futures:
num_returns = 10 if len(futures) >= 10 else len(futures)
visited_ref, futures = ray.wait(futures, num_returns=num_returns)
visited = ray.get(visited_ref)
# Concatenate results from visited into self.result
del visited_ref
print("\tAlgorithm took {:.3f} seconds\n".format(time.time() - s))
@ray.remote(scheduling_strategy="SPREAD", num_cpus=1)
def algorithm_starting_at_node_list(self, nodes: np.ndarray) -> List[Dict[int, int]]:
res = []
for node in nodes:
res.append(self.algorithm_starting_at_node(node))
return res
def algorithm_starting_at_node(self, node: int) -> Dict[int, int]:
# time-intensive algorithm which accesses class member fields (i.e. self.G, self.nodes)
# and returns a dictionary
pass
Any feedback is appreciated and more context can be provided if needed! Thanks in advance!