Crash in ray connector pipeline v2

i am experiencing crashes happening with multiple machines after 4-5 days of training, with the following stack trace, on ray 2.30.0

2024-07-12 19:48:19,937    ERROR tune_controller.py:1331 -- Trial task failed for trial bb20cc69
Traceback (most recent call last):
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/_private/worker.py", line 2630, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/_private/worker.py", line 863, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(KeyError): ^[[36mray::ImplicitFunc.train()^[[39m (pid=1665315, ip=192.168.0.4, actor_id=d7e98aaece62544336a6306801000000, repr=single_agent_train)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/air/_internal/util.py", line 98, in run
    self._ret = self._target(*self._args, **self._kwargs)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 45, in <lambda>
    training_func=lambda: self._trainable_func(self.config),
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py", line 248, in _trainable_func
    output = fn()
  File "/home/mfioravanti/example/.venv/bin/impl/learn.py", line 110, in single_agent_train
    train.report(model.train())
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 328, in train
    result = self.step()
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm.py", line 867, in step
    ) = self._run_one_training_iteration_and_evaluation_in_parallel()
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm.py", line 3239, in _run_one_training_iteration_and_evaluation_in_parallel
    evaluation_results = self._run_one_evaluation(
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm.py", line 3183, in _run_one_evaluation
    eval_results = self.evaluate(parallel_train_future=parallel_train_future)
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm.py", line 988, in evaluate
    self.evaluation_workers.sync_env_runner_states(
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 484, in sync_env_runner_states
    self.foreach_worker(
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 836, in foreach_worker
    local_result = [func(self.local_worker())]
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 468, in _update
    _env_runner._env_to_module.set_state(
  File "/home/mfioravanti/example/.venv/lib/python3.10/site-packages/ray/rllib/connectors/connector_pipeline_v2.py", line 231, in set_state
    raise KeyError(f"No state found in `state` for connector piece: {key}!")
KeyError: 'No state found in `state` for connector piece: 000_AddObservationsFromEpisodesToBatch!'

i tried to understand what is going on, but as far as i can tell, there was no other issue before this one, and this one seems non deterministic. If there a known way i can work around it at least?

since it is just the evaluation crashing, i would be fine with disabling ray tune evaluation

I’m also seeing this same error on ray 2.32.0.

I’m running multi-agent train with a custom env. The game is a “continuing task”, aka the goal for any agent is to maximize the cumulative reward over an indefinite time horizon. So I’m never “terminating” but will “truncate” if 1) the agent acts out-of-bounds in a non-recoverable way or 2) after some amount of time to rollup for weight updates.

When I run it with num_env_runners=0 it works great. However, if I run it with num_env_runners greater than 0, it runs for a while and then breaks:

Traceback (most recent call last):
  File "~/workspace/policy_trainers/multi_agent_rllib.py", line 108, in <module>
    run_train()
  File "~/workspace/policy_trainers/multi_agent_rllib.py", line 98, in run_train
    result = algo.train()
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 328, in train
    result = self.step()
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm.py", line 875, in step
    self.workers.sync_env_runner_states(
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 516, in sync_env_runner_states
    self.foreach_worker(
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 885, in foreach_worker
    local_result = [func(self.local_worker())]
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/env/env_runner_group.py", line 513, in _update
    _env_runner.set_state(ray.get(ref_env_runner_states))
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/env/multi_agent_env_runner.py", line 685, in set_state
    self._env_to_module.set_state(state["env_to_module_connector"])
  File "~/workspace/.venv/lib/python3.10/site-packages/ray/rllib/connectors/connector_pipeline_v2.py", line 231, in set_state
    raise KeyError(f"No state found in `state` for connector piece: {key}!")
KeyError: 'No state found in `state` for connector piece: 000_AddObservationsFromEpisodesToBatch!'

For reference, here’s the error in context:

@override(ConnectorV2)
def set_state(self, state: Dict[str, Any]) -> None:
    for i, connector in enumerate(self.connectors):
        key = f"{i:03d}_{type(connector).__name__}"
        if key not in state:
            raise KeyError(f"No state found in `state` for connector piece: {key}!")
        connector.set_state(state[key])

When I add a debug in connector_pipeline_v2.py where the error is thrown, I’m seeing that state is an empty dictionary. If I move the debug to outside the for loop, so it hits earlier in training, I can see 000_AddObservationsFromEpisodesToBatch in the keys.

I’m wondering if this has to do with the output of step() in my env? Here’s the signature:

def step(
    self, action_dict: dict[str, np.ndarray]
) -> tuple[dict[str, np.ndarray], dict[str, float], dict[str, bool], dict[str, bool], dict[str, dict]]:

and the return values:

return observations, step_rewards, terminateds, truncateds, info

Right now those dictionaries have keys for each key in action_dict.keys(), not each key in self._agent_ids. I think this is the correct pattern, but maybe is a hint for where the error is coming from?