With some more digging it seems that “remote_worker_envs” is only usable if we are using vectorized_environments which we are not.
ValueError: Remote envs only make sense to use if num_envs > 1 (i.e. environment vectorization is enabled).
The question then becomes what settings are needed to ensure that environments that are expensive to both step and reset are run in parallell because as it seems that it did not work for us. We had one environment essentially blocking for quite a long time and that was reflected in the tune/agent_timesteps_total metric.
Is it the num_workers>1
param for e.g. PPO that should be set for the algorithm to ensure that multiple environments are created and stepped asynchronously? There is also a num_rollout_workers
param mentioned in the docs. The local_mode in ray_init was set to false so that should not have been an issue.
edited
I created a minimal runnable example that is close to what we have. Some environments take different times to step and reset. Running this example will show a plot similar to our actual plot:
From what I have read up on now the stepping is done in parallell but one train_batch
expects an equal amount of samples from each environment. This means that if we have a slow environment this will impact total sample speed so to speak. Is there any option for us to avoid this?
Ideally the environments would sample as quick as they can upto train_batch
with no regards to where the sample is coming from.
If we would instead try to use num_envs_per_worker
and remote_worker_envs
, would this solve the issue?
import ray
from ray import tune
from ray.tune.registry import register_env
from ray.rllib.agents import ppo
from ray.rllib.algorithms.ppo import PPOConfig
from typing import Any, Tuple, Union
import gymnasium as gym
from gymnasium import spaces
import time
class BadEnv(gym.Env):
def __init__(self, env_config) -> None:
self.label = env_config["label"]
self.step_delay = env_config["step_delay"]
self.reset_delay = env_config["reset_delay"]
self.timesteps = env_config["timesteps"]
self.current_timestep = 0
# every set reset we increase the reset time
self.reset_iteration = 0
# does not matter
self.action_space = spaces.Discrete(10)
self.observation_space = spaces.Box(0, 1, (10,))
def step(self, action: Any) -> Tuple[Any, float, bool, bool, dict]:
if self.reset_iteration % 13 == 0:
print("step in {} increased".format(self.label))
time.sleep(1 + self.step_delay * 20)
else:
print("step in {}".format(self.label))
time.sleep(1 + self.step_delay)
self.current_timestep += 1
return self.observation_space.sample(), 0, False, self.current_timestep >= self.timesteps, {}
def reset(self, *, seed: Union[int, None] = None, options: Union[dict, None] = None) -> Tuple[Any, dict]:
self.reset_iteration += 1
if self.reset_iteration % 19 == 0:
print("reset in {} increased".format(self.label))
time.sleep(1 + self.reset_delay * 20)
else:
print("reset in {}".format(self.label))
time.sleep(1 + self.reset_delay)
self.current_timestep = 0
return self.observation_space.sample(), {}
def env_creator(config):
worker_index = config.worker_index
# get the specific env config, -1 due to init worker
env_config = config["environments"][worker_index-1]
return BadEnv(env_config)
register_env("bad_env", env_creator)
config = {
"environments": [
{ "label": "step_0.5_reset_0", "step_delay": 0.5, "reset_delay": 0, "timesteps": 10 },
{ "label": "step_0_reset_10", "step_delay": 0, "reset_delay": 10, "timesteps": 10 },
]
}
custom_ppo_config = {
"num_workers": len(config["environments"]),
"train_batch_size": 50,
"sgd_minibatch_size": 5
}
algo_training_config = PPOConfig()
algo_training_config = algo_training_config.update_from_dict(custom_ppo_config) # algo specifics
trainer = ppo.PPOTrainer
train_config = {
"env": "bad_env",
"env_config": config, # this includes multiple environments but are chosen in env_creator
# is this necessary?
#"num_rollout_workers": n_environments,
"framework": "torch",
}
final_config = algo_training_config.update_from_dict(train_config)
ray.init(local_mode=False)
tune.run(
trainer,
config=final_config,
local_dir="~/ray_results_syncing",
)