How does Ray handle situations in which the number of required actors exceeds the number of available CPU cores

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hello, I have a question regarding how Ray handles tasks were the number of requested actors exceeds the number of available CPU cores. I am relatively new to multiprocessing and Ray – so I partly hope that the answer to my question is not to obvious :wink:

To give you some background information: I am implementing an Ensemble Kalman Filter algorithm in Python, which uses a simulator instead of a classic state-space model. The simulator is based on a deck which is used to initialize the simulator and control it during its operation (the simulator is third party software that I have not created nor have any influence on how it operates). That is, I need several of these simulators running at the same time with different initial conditions. Apparently, each simulator requires its own deck, but copying the one deck I have access to in order to produce multiple decks is not an option. So, I did some research and came up with multiprocessing as a possible solution. This way, I can use different cores for the different simulator objects, and since each core uses its own, separate storage I should be able to operate multiple, independent simulator objects simultaneously even though there exists only one ‘physical’ version of the deck (this is at least my assumption after spending an afternoon with concurrent processing).

I decided to use Ray’s actors to implement the multiprocessing tasks, which worked better than I had expected. To my surprise, the filtering results seemed reasonable even if the number of required Ray actors exceeded the number of available CPU cores, which brings me to my question: How does Ray handle situations in which the number of required actors exceeds the number of available CPU cores with respect to storage usage, ultimately, the global and locale variable space the different actors have access to? In order to perform the filtering task correctly, each simulator objects needs to maintain its locale variable space during the algorithm’s run time, in particular, the state of its deck. Given the simulation results I assume that this is the case. However, in order to evaluate the filter results correctly I need to be certain that the locale variable space remains the same for each actor. Maybe a similar question had been asked before, however, I couldn’t find any answer yet.

In case this might be helpful a small, representative code example of what I am doing:
Each simulator is controlled and managed by an object which is also an actor.

@ray.remote
class SimManager:
    def __init__(self, init_values):
        self.some_attributes = some_values
        self.simulator = init_simulator(init_values)
    def simulation_step()
        # Code to perform a single simulation step.
        return simulation_result

An object of another class creates the requested number of SimManager objects (determined by n_ens), stores them, and performs a simulation step with all of them if required.

class EnsembleManager:
    def __init__(self, n_ens):
        self.some_attributes = some_values
        ensemble_init_values = create_init_values()
        self.sim_manager_list = [SimManager.remote(init_values) for init_values in ensemble_init_values]
    def ensemble_simulation_step():
        ensemble_results = list()
        for sim_manager in self.sim_manager_list:
            ensemble_results += [sim_manager.simulation_step.remote()]
        ensemble_results = ray.get(ensemble_results)
        return ensemble_results 

If I consider the simulation time, the SimManager objects don’t seem to operate in parallel, however, this is more of a minor issue at the moment. More importantly, I need to know if each SimManager has in fact its own simulator object with its own deck or if they may get mixed up if n_ens > num_cpus.

Thank’s for your help!

The CPU resource tells Ray how many CPUs the tasks of the actor require. This prevents oversubscription of Actor task calls on a machine. In your case, you shouldn’t have more than os.cpu_count() simulation_steps running.

To apply further limits, you can use Ray’s memory resource. This limits how many actors can be contained within a single node. More information here Resources — Ray 2.7.0.

Example:

#!/usr/bin/env python3

import ray 
import os

num_actors = os.cpu_count() + 1 
gb_per_actor = None

# Uncomment to limit by memory.
# This is set to 100GB. My machine has 700GB RAM, so 7 actors can be created.
# The script hangs after that (as expected), as Ray won't go over the limit.
#gb_per_actor = 100 * 2**30

@ray.remote(memory=gb_per_actor)
class Actor:
    def __init__(self, rank):
        self.rank = rank

    def get_rank(self):
        return self.rank

actors = [Actor.remote(rank) for rank in range(num_actors)]
print(f'Scheduled {len(actors)} actors')

done_ranks = set()
result_refs = [actor.get_rank.remote() for actor in actors]
while result_refs:
    ready_refs, result_refs = ray.wait(result_refs, num_returns=1)
    done_ranks.add(ray.get(ready_refs[0]))
    print(f'{len(done_ranks)} complete')

With memory=100GB:

Scheduled 193 actors
1 complete
2 complete
3 complete
4 complete
5 complete
6 complete
7 complete
(autoscaler +16s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(autoscaler +16s) Warning: The following resource request cannot be scheduled right now: {'memory': 107374182400.0, 'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.

With memory=None:

Scheduled 193 actors
1 complete  
2 complete  
3 complete  
4 complete
...
191 complete
192 complete
193 complete

Thank you for your response. In the section you send me it says that

an infinite number of them [actors] can run on any non-zero cpu node

, which is what I experienced as well by setting n_ens>num_cpu. What is the reason why I still shouldn’t have more than os.cpu_count() simulation_steps running? Eventually, you already gave the explanation:

The CPU resource tells Ray how many CPUs the tasks of the actor require. This prevents oversubscription of Actor task calls on a machine.

, which I haven’t understood in that case – I’m still new to the topic, so I apologize in advanced in case this might be fundamental knowledge.

Given this section you linked, based on what I experienced by further increasing n_ens , and your example, I assume that it is in theory possible to operate a number of k actors actors at once, even though k>num_cpu. However, in this case memory is in practice the limiting factor. That is, if the amount of memory necessary to contain the actors is larger than the available memory, errors will occur. In this case, n_ens is not necessarily limited by num_cpus but, obviously, only num_cpus simulation steps will be performed at the time. And if n_ens is increased to far, errors will occur. Accordingly, it is better to limit the number of simulation_steps, hence, n_ens, to os.cpu_count(), but it is not necessarily required?

At least, this is what I’m currently assuming based on what I read here and what I’ve been able to observe by trail and error.

The scheduling of actors is different than the scheduling of actor tasks. In the default case, actors may oversubscribe CPUs, but actor tasks may not oversubscribe CPUs.

How are situations handled then where the CPUs are oversubscribed? Given your answers I would assume that is not possible to set n_ens>num_cpu or that the results are unpredictable in this case. But that does not seem to be the case.

Regardless, is there a better approach than Actors to solve my problem?