I am using Ray to implement a bulk synchronous parallel algorithm (very similar, in spirit, to Apache Giraph). The core idea is that there are many nodes in a graph that maintain state and pass messages to other nodes. As can be seen in the diagram, each Actor maintains a subset of the nodes (assigned in a round-robin fashion to help balance the workload). There are two types of nodes (Type A, Type B) that each send a different kind of message. The algorithm runs as follows:
Type A step (for each Actor)
Process messages from the local “in” queue and update the state of all nodes.
Send messages to Type B nodes via the router’s “in” queue.
Synchronizing step (Router)
Route each message in the router’s “in” queue to the appropriate Actor’s “in” queue.
Type B step (for each Actor)
Process messages from the local “in” queue and update the state of all nodes.
Send messages to Type A nodes via the router’s “in” queue.
Synchronizing step (same as above)
Repeat for a fixed number of iterations.
When timing the program, I can see that the bottleneck is at the synchronizing step. I am using ray.wait() with minimal timeout on the Actor tasks in combination with checking the router’s “in” queue size to ensure I’m incrementally routing them as they become available. The ray.wait() is used only because there is a delay until messages begin to be added to the queue. I am using the ray Queue so that it can be accessed by all Actors. Lastly, I do not block when accessing the queue.
The single-process version runs orders of magnitude faster, in spite of their being minimal blocking with the Actor version. Is there a reason why this may be? Could throttling be a possible issue with using a single queue that is being used by all Actors to send messages? Here is some timing results for context:
Single-process version:
Sending to Type A nodes: 0.140007 s
Routing messages: 0.047997 s
Sending to Type B nodes: 8.789228 s
Routing messages: 0.051641 s
Actor version:
Sending to Type A nodes: 0.001186 s
Routing messages: 107.729846 s
Sending to Type B nodes: 0.001242 s
Routing messages: 82.142407 s
Is it possible to pass actor handles between actors? In the current design, with only a single routing queue, the bottleneck effectively limits the parallelism of the message passing activity. However, one idea I had was to remove the synchronizing steps and just have messages be passed between actors and processed as they arrive, rather than waiting for all messages to be sent and re-routed before processing. The possibility of not needing the routing queue at all was something I just thought about, but was not sure if the actor-to-actor communication was actually possible.
Yeah you can use named actor for this; Using Actors — Ray v1.1.0 (You can also use detached actor to make each of actor not fate sharing to each other).
I bring good news. The actor-to-actor implementation was successful in improving performance. The following are the implementation changes, compared to the original post.
Vertices are grouped by type such that actors perform only one type of vertex computation.
The “hard” synchronizing step was replaced with a “soft” evaluation step. Specifically, after a set number of messages are processed by a variable vertex actor, a stopping condition is evaluated and the actor returns the final vertex values. The entire algorithm is halted once all variable vertex actors have met the stopping condition (i.e. using ray.get()).
Message passing is achieved via passing actor handles of the opposite vertex type to all vertex actors. Knowledge of each actor (i.e. index in the list) a receiving vertex belongs to is stored in shared memory, along with the static graph which allows a vertex to know to which of its neighboring vertices it should send a message. This avoids having to use a central router queue (as depicted in the original post). Each Actor maintains an asynchronous queue attribute and a remote method that other actors can use to add to it.
I use a heuristic, based on the number of known variable vertices and available physical CPUs, to determine how many actors to assign to each type of vertex. Given that the factor vertices perform the more intensive computation, I aim to maintain a 2:1 ratio of factor:variable Actors.
A simple simulation using 500 and 1000 variable vertices has demonstrated a distinct trade-off between the single-process and multi-process implementations:
500 (single/multi): 5:45 min / 22:08 min
1000 (single/multi): 6:14 min / 13:08 min
Thus, with any less than ~600 variables, it is better to use the single-process implementation.
The single-process implementation uses a dictionary to maintain the “queue” for each vertex to keep track of the sender of each message. Surprisingly (or perhaps not), my original implementation of storing multiple messages as an itertools.chain instance is significantly faster than the alternative implementation I tried that uses a collections.deque to store all message objects in memory. Thus, rather than processing each message from variable vertices individually, all messages sent by a variable vertex are processed at once.