Ray creates a worker (thread) for every remote class

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am creating a computer simulation with 10000 agents implemented as remote classes. My box has 256GB of memory and 32 virtual CPUs.
I define a remote class:
@ray.remote #(num_cpus=0) #(num_cpus=1)
class agent:

and then create agents:
agent_classes = [agent_classes = [agent.remote( ) for m in range(10000)]

Getting these warnings:

(scheduler +53s) Warning: The following resource request cannot be scheduled right now: {‘CPU’: 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.

2023-03-31 15:32:31,606 WARNING worker.py:1851 – WARNING: 430 PYTHON worker processes have been started on node: 17160aa4e6e6a918f697980ee8384253d5c2971790883a69c2572b7d with address: 10.9.165.111. This could be a result of using a large number of actors, or due to tasks blocked in ray.get() calls (see Ray starts too many workers (and may crash) when using nested remote functions. · Issue #3644 · ray-project/ray · GitHub for some discussion of workarounds).

and then python kernel dies.
Ray creates 32 worker threads:
jovyan 105319 101946 3 15:19 ? 00:00:05 ray::agent
jovyan 105320 101946 3 15:19 ? 00:00:04 ray::agent
jovyan 105321 101946 3 15:19 ? 00:00:04 ray::agent

I understand that Ray is trying to create one worker thread for every instance of the remote class. I hoped that Rays would queue execution of calls to the agents on a pull of available worker threads (for example 32 threads) instead, similar to how it schedules calls to a remote function on a pull of available threads. I benchmarked parallel invocations of a remote function and it scales up very well.
How could I change my remote class so it would not require an individual worker thread per every instance?

If I understand your use case correctly, Agents are simple containers of states, and you want to use 32 threads to update the states of all 10000 agents.
In that case, I’d suggest you re-structure your program a bit:

  • Create 10K agent instances that are simple python objects.
  • Create 10K remote tasks to update each of the agents, which requires 1 cpu / task.
  • Simply Fire off these 10K tasks at the same time. Since your cluster only has 32 cpus. These tasks will get executed in sequence, 32 at a time just fine.

In summary, something like this:

class Agent:
  state: Any

@ray.remote(num_cpus=1)
def update_agent(agent):
    # Change agent state.
    ...

agents = [Agent() for _ in range(10000)]
ray.get([update_agent.remote(agent) for agent in agents])

Right, I was thinking of this as a stop-gap solution. But here is the twist:
An agent state is pretty big, and I just want it to stay pinned on a machine where the agent is initially placed. For example we could consider a parameter server pattern
[Parameter Server — Ray 2.3.1]](Parameter Server — Ray 2.3.1). So the agent is a remote class parameter server, and the worker is a remote class assigned to processing this agent. Is there a way to ensure that corresponding workers and agents are collocated on the same thread (or at least on the same host)?

I’ve read that Ray tries to schedule remote function execution where the data is located. So if I do something like
agent_references = [put(agent) for agent in agents]

and then will be doing
agent_futures = [update_agent.remote(agent_reference) for agent_reference in agent_references])

that would work, correct?
But the question is, how to distribute the placement of agent objects ( agent_references ) evenly between machines in the cluster?

how many machines does your cluster have?
and how big is the state of each agent we are talking about here?

as a rule of thumb, actors are stateful processing units, think of them as micro-services.
they are backed by a distributed data layer, in Ray’s case the object store, etc.

actors are expensive. Ray needs to constantly communicate with every single one of them, monitor them, collect metrics and stats, etc.
so the real bottle neck here is not running 10K remote calls on these actors, but simply having all these 10K actors.

if you don’t want the agents to move, and are using multiple computers to advance them, maybe you can create multiple simulated envs, one on each node, owning part of the 10K agents. you then tell each env to process local agents.

Thank you, I think this is a great idea! My agents do not directly interact which each other. They all advance a single step, and then a central coordinator collects their results (which are a fraction of their states), and distributes new assignments.
How do I implement this? By “you can create multiple simulated envs” do you mean RLib MultiAgent environment? Would it help?
Alternatively, I could create say 30 instances of a worker remote class, and each of the instances would be owning and advancing 10000 agents, and taking care of communications with the central coordinator. Is it what you are suggesting?

yeah, exactly.
if you have 30 remote classes, every class will only need to own 333 agents to make the total up to 10.
you then instruct these classes by firing all the remote calls all at once, and they should get properly queued.