Multiple environments, expensive resets and "remote_worker_envs": True

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hello. A question about scaling environments. When I have multiple environments ("n_workers" > 1) with different step times (mean_env_wait_ms) and independent and expensive reset functions, surely it makes sense in using "remote_worker_envs": True? The docs here says that the environments will then be run in parallell and the inference will not necessarily be batched. Is this the same as saying that the environments will be run asynchronously and that they “will sample as fast as they can” so to speak? What happens when one environment is stuck in an expensive reset and will not do step for a long time? With "remote_worker_envs": False, will this expensive reset function block the rest of the environments?

I have at this point only tested with "remote_worker_envs": False as it is the default but I think the performance really took a hit. I will try with "remote_worker_envs": True but I also want to make sure that I understand this setting correctly. Model inference time is negligible so no worries there.

Thanks for the help,

With some more digging it seems that “remote_worker_envs” is only usable if we are using vectorized_environments which we are not.

ValueError: Remote envs only make sense to use if num_envs > 1 (i.e. environment vectorization is enabled).

The question then becomes what settings are needed to ensure that environments that are expensive to both step and reset are run in parallell because as it seems that it did not work for us. We had one environment essentially blocking for quite a long time and that was reflected in the tune/agent_timesteps_total metric.

Is it the num_workers>1 param for e.g. PPO that should be set for the algorithm to ensure that multiple environments are created and stepped asynchronously? There is also a num_rollout_workers param mentioned in the docs. The local_mode in ray_init was set to false so that should not have been an issue.

edited
I created a minimal runnable example that is close to what we have. Some environments take different times to step and reset. Running this example will show a plot similar to our actual plot:
Screenshot from 2023-06-28 13-25-42

From what I have read up on now the stepping is done in parallell but one train_batch expects an equal amount of samples from each environment. This means that if we have a slow environment this will impact total sample speed so to speak. Is there any option for us to avoid this?
Ideally the environments would sample as quick as they can upto train_batch with no regards to where the sample is coming from.

If we would instead try to use num_envs_per_worker and remote_worker_envs, would this solve the issue?

import ray
from ray import tune
from ray.tune.registry import register_env

from ray.rllib.agents import ppo
from ray.rllib.algorithms.ppo import PPOConfig

from typing import Any, Tuple, Union
import gymnasium as gym
from gymnasium import spaces
import time

class BadEnv(gym.Env):
    def __init__(self, env_config) -> None:
        self.label = env_config["label"]
        self.step_delay = env_config["step_delay"]
        self.reset_delay = env_config["reset_delay"]
        self.timesteps = env_config["timesteps"]
        self.current_timestep = 0
        # every set reset we increase the reset time
        self.reset_iteration = 0

        # does not matter
        self.action_space = spaces.Discrete(10)
        self.observation_space = spaces.Box(0, 1, (10,))
    
    def step(self, action: Any) -> Tuple[Any, float, bool, bool, dict]:
        if self.reset_iteration % 13 == 0:
            print("step in {} increased".format(self.label))
            time.sleep(1 + self.step_delay * 20)
        else:
            print("step in {}".format(self.label))
            time.sleep(1 + self.step_delay) 

        self.current_timestep += 1
        return self.observation_space.sample(), 0, False, self.current_timestep >= self.timesteps, {}
    
    def reset(self, *, seed: Union[int, None] = None, options: Union[dict, None] = None) -> Tuple[Any, dict]:
        self.reset_iteration += 1
        if self.reset_iteration % 19 == 0:
            print("reset in {} increased".format(self.label))
            time.sleep(1 + self.reset_delay * 20)
        else:
            print("reset in {}".format(self.label))
            time.sleep(1 + self.reset_delay) 

        self.current_timestep = 0
        return self.observation_space.sample(), {}

def env_creator(config):
    worker_index = config.worker_index
    # get the specific env config, -1 due to init worker
    env_config = config["environments"][worker_index-1]
    return BadEnv(env_config)
register_env("bad_env", env_creator)

config = {
    "environments": [ 
        { "label": "step_0.5_reset_0", "step_delay": 0.5, "reset_delay": 0, "timesteps": 10 },
        { "label": "step_0_reset_10", "step_delay": 0, "reset_delay": 10, "timesteps": 10 },
    ]
}

custom_ppo_config = {
    "num_workers": len(config["environments"]),
    "train_batch_size": 50,
    "sgd_minibatch_size": 5
}
algo_training_config = PPOConfig()
algo_training_config = algo_training_config.update_from_dict(custom_ppo_config) # algo specifics
trainer = ppo.PPOTrainer

train_config = {
    "env": "bad_env",
    "env_config": config, # this includes multiple environments but are chosen in env_creator
    # is this necessary?
    #"num_rollout_workers": n_environments, 
    "framework": "torch",
}

final_config = algo_training_config.update_from_dict(train_config)
ray.init(local_mode=False)
tune.run(
    trainer,
    config=final_config,
    local_dir="~/ray_results_syncing",
)

You’ve almost got it-num_workers will use Ray to spin up parallel workers across a cluster, whereas num_envs_per_worker only specifies the number of environments to use per worker. We parallelize the environments in each worker using Ray to behave as a single vector env if you set remote_worker_envs=True, so in total you’ll sample from num_workers * num_envs_per_worker envs. I would recommend using num_workers set to as many cores as you have if you’re using a single instance, with num_envs_per_worker=1.

This isn’t entirely accurate; we sample in a round-robin manner over environments in calls to worker.sample.

Thank you for your answer. Having num_envs_per_worker=1 seems best and as the docs say it will reduce overhead. But the problem remains that environments with costly resets block the sample gathering of available other environments.

When running the example (see below) we have 2 environments where one env takes 50s to reset. The sampling will very clearly stop and block once the slow environment resets (edit: see the see-saw effect in the orange plot below). Ideally, the sampling would continue from all environments that can produce samples. Is there any way of achieving this? Only running the environment with no reset delay achieves much better performance (edit: blue plot where only the environment with no reset delay is used).

image (orange is the two envs)

When you mention round-robin style manner does this mean that sampling is done “equally” from all environments based on the train_batch_size?

If we instead use remote_worker_envs=True and num_workers = 1 and handle the environments using num_envs_per_worker=n_environments would this change the round-robin manner of sampling? This is perhaps not a common problem but the docs mention Starcraft with regards to the remote_worker_envs. Perhaps difficult to say but I feel that the Starcraft comment should relate to the reset and step delays that different environments might encounter.

Thank you once again,

import ray
from ray import tune
from ray.tune.registry import register_env

from ray.rllib.agents import ppo
from ray.rllib.algorithms.ppo import PPOConfig

from typing import Any, Tuple, Union
import gymnasium as gym
from gymnasium import spaces
import time

class BadEnv(gym.Env):
    def __init__(self, env_config) -> None:
        self.label = env_config["label"]
        self.reset_delay = env_config["reset_delay"]
        self.timesteps = env_config["timesteps"]
        self.current_timestep = 0
        self.reset_iteration = 0

        # does not matter
        self.action_space = spaces.Discrete(10)
        self.observation_space = spaces.Box(0, 1, (10,))
    
    def step(self, action: Any) -> Tuple[Any, float, bool, bool, dict]:
        print("step in {}".format(self.label))
        self.current_timestep += 1
        return self.observation_space.sample(), 0, False, self.current_timestep >= self.timesteps, {}
    
    def reset(self, *, seed: Union[int, None] = None, options: Union[dict, None] = None) -> Tuple[Any, dict]:
        self.reset_iteration += 1
        if self.reset_iteration > 3: # to avoid during RLlib setup
            print("reset in {}".format(self.label))
            time.sleep(self.reset_delay)

        self.current_timestep = 0
        return self.observation_space.sample(), {}

def env_creator(config):
    worker_index = config.worker_index
    # get the specific env config, -1 due to init worker
    env_config = config["environments"][worker_index-1]
    return BadEnv(env_config)
register_env("bad_env", env_creator)

config = {
    "environments": [ 
        { "label": "reset_50", "reset_delay": 50, "timesteps": 100 },
        { "label": "reset_0", "reset_delay": 0, "timesteps": 500 },
    ]
}

custom_ppo_config = {
    "num_workers": len(config["environments"]),
    "train_batch_size": 64,
    "sgd_minibatch_size": 5
}
algo_training_config = PPOConfig()
algo_training_config = algo_training_config.update_from_dict(custom_ppo_config) # algo specifics
trainer = ppo.PPOTrainer

train_config = {
    "env": "bad_env",
    "env_config": config, # this includes multiple environments but are chosen in env_creator
    # is this necessary?
    #"num_rollout_workers": n_environments, 
    "framework": "torch",
}

final_config = algo_training_config.update_from_dict(train_config)
ray.init(local_mode=True)
tune.run(
    trainer,
    config=final_config,
    local_dir="~/ray_results_syncing",
)

I continued with the tests and by using remote_worker_envs=True and num_envs_per_worker=n_environments the environments sample and reset with no issues or blocks. In the example its changing to vector index and only having one worker.

image
green is the same plot as in the first image above while gray is this version

I’m still interested in how it works and what is different compared to the non-remote version.
A future question is how LSTMs work when using remote_worker_envs and how sequences of samples generated by a single environment are kept together. For example, if you have an extremely slow environment and only few samples are gathered from this environment for every training batch, the cohesive sequence would be short (and perhaps not so usable). If anyone has any experience with this, it would be great to hear it.

def env_creator(config):
    vector_index = config.vector_index
    env_config = config["environments"][vector_index]
    return BadEnv(env_config)

custom_ppo_config = {
    "num_workers": 1,
    "train_batch_size": 64,
    "sgd_minibatch_size": 5
}

train_config = {
    "env": "bad_env",
    "env_config": config, # this includes multiple environments but are chosen in env_creator
    "framework": "torch",
    "num_envs_per_worker": len(config["environments"]),
    "remote_worker_envs": True
}