Spawning and waiting on tasks from within tasks

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

TLDR: Why can’t ray make workers non blocking and schedule pending tasks on them while they are blocked instead of spawning new workers?

Hi, we are using ray core to develop a distributed version of a model explainability algorithm.
The basic idea is to create a binary tree which divides the domain of samples from the model into two sections at each node, with the end goal of accurately approximating the model behaviour using a linear regression in the tree leaves.

The tree is constructed with a recursive fit_tree() function which returns a node. If the current node is not a terminal one, then fit_tree computes the best split variable and split point and calls itself two times, one for the left child and one for the right child.

The slowest portion is the computation of the best split point, which is sequential, though it can be performed in parallel for each of the possible splitting variables (each dimension of the domain).

If we only use ray to compute the splitpoints in parallel, the maximum level of parallelism is bound to the number of features or dimensions of the domain of the model, which often isn’t too high (5~40).

We would like to obtain parallelism also on the binary tree computation itself, in fact each child could be computed in parallel with its other sibling and their children be computed in parallel with each other etc. This could theoretically allow for higher parallelism depending on the depth and breadth of the tree.

We attempted a similar implementation with ray, but encountered a problem with too many workers getting spawned, many more than the test system cores, hundreds, rather than remaining limited to the 8cores/16threads of the test machine. This quickly causes the system to run out of resources and to crash the execution.

From our understanding the issue derives from ray.get() blocking workers, causing ray to create new ones to avoid deadlocking the system.
We considered various alterations to the algorithm structure to avoid blocking the workers, but it either requires a complete restructuring of the entire problem or incurs in other penalties.

The best alternative we came up with was to use async await to generate the tree concurrently rather than in parallel, though this as well has downsides:

  • The computation of the split point isn’t the only heavy computation that happens during the generation of the tree, the surrounding code is also sequential CPU bound code, which keeps the driver busy and stops the scheduling and handling of other nodes for the duration of the CPU bound work.
  • We fear that in a distributed environment the data transfer to and from the driver could present a bottleneck.

We also considered using async actors instead of a driver, though this poses the issues of the users having to manually manage the number and scaling of these actors, it also requires us to implement our own scheduling system to distribute the work across the actors, without considering optimizations such as workstealing etc, which would require even more low level work, on top of the restructuring of the existing code.

What we would find really useful would be to be able to spawn a task for each sub node and have the task be able to await in a non blocking fashion on the the results of the parallel computation for the splipoints.

It occurs to us that if instead of having ray.get block the worker, it could be beneficial to instead have it handle another pending task, scheduling the continuation of the function to be computed when the result is ready.

This kind of behaviour is inspired by async runtimes of other languages, like Rust and Go.
What we are envisioning is using ray through asyncio primitives to avoid blocking both the driver and the workers.
Workers would be able to asynchrony schedule new tasks and await on them and ray would make sure to keep the workers busy in the meantime with pending tasks if there are any.

This would allow keeping the number of workers in line with the number of physical cores available in the distributed system instead of adding the overhead of generating new workers and the resource cost of doing so.

This kind of paradigm would allow to parallelize sequential code with much less code changes, simply changing some functions to be remote or async and some attention with await or reusing references when getting the results, all the while ray is distributing the work on the underlying distributed system.

With this post I was hoping to have a discussion on the possible approach of non blocking ray workers, hearing the opinions of the ray developers and contributors. I realize there might be good reasons against such an approach or particular difficulties with it, and if so I was interested in reading and discussing about them.