How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I have built a custom environment using OpenAI’s gymnasium to pack ellipse shapes into a circle until the circle is fully covered. I’m trying to train a single agent using PPO to find the optimal way to pack. However, I’m quite new to RL and have never used Ray RL lib. My environment has a box observation space to represent a 1024x1024 image and a Tuple space, which contains three Discrete spaces, so I’ve had to build my own CNN. I have based this on the tiny_atari_cnn_rlm.py example:
from typing import Any, Dict, Optional
import ray
import os
import gymnasium as gym
from ray.rllib.core.columns import Columns
from ray.rllib.core.learner.utils import make_target_network
from ray.rllib.core.rl_module.apis import (
TargetNetworkAPI,
ValueFunctionAPI,
TARGET_NETWORK_ACTION_DIST_INPUTS,
)
from ray.rllib.core.rl_module.torch import TorchRLModule
from ray.rllib.models.torch.misc import (
normc_initializer,
same_padding,
valid_padding,
)
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.typing import TensorType
from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI
@PublicAPI
def try_import_torch(error: bool = False):
"""Tries importing torch and returns the module (or None).
Args:
error: Whether to raise an error if torch cannot be imported.
Returns:
Tuple consisting of the torch- AND torch.nn modules.
Raises:
ImportError: If error=True and PyTorch is not installed.
"""
if "RLLIB_TEST_NO_TORCH_IMPORT" in os.environ:
logger.warning("Not importing PyTorch for test purposes.")
return _torch_stubs()
try:
import torch
import torch.nn as nn
return torch, nn
except ImportError:
if error:
raise ImportError(
"Could not import PyTorch! RLlib requires you to "
"install at least one deep-learning framework: "
"`pip install [torch|tensorflow|jax]`."
)
return _torch_stubs()
torch, nn = try_import_torch()
class CustomCNN(TorchRLModule, ValueFunctionAPI, TargetNetworkAPI):
@override(TorchRLModule)
def setup(self):
"""Use this method to create all the model components that you require.
Feel free to access the following useful properties in this class:
- `self.model_config`: The config dict for this RLModule class,
which should contain flxeible settings, for example: {"hiddens": [256, 256]}.
- `self.observation|action_space`: The observation and action space that
this RLModule is subject to. Note that the observation space might not be the
exact space from your env, but that it might have already gone through
preprocessing through a connector pipeline (for example, flattening,
frame-stacking, mean/std-filtering, etc..).
"""
# Get the CNN stack config from our RLModuleConfig's (self.config)
# `model_config` property:
conv_filters = self.model_config.get("conv_filters")
# Default CNN stack with 3 layers:
if conv_filters is None:
conv_filters = conv_filters = [
[32, 5, 4, "same"],
[64, 5, 4, "same"],
[128, 3, 4, "same"],
[256, 3, 4, "same"],
[512, 3, 2, "same"],
[1024, 3, 2, "same"],
]
# Build the CNN layers.
layers = []
# Add user-specified hidden convolutional layers first
width, height, in_depth = self.observation_space.shape
#(f' width: {width}, height: {height}, in_depth: {in_depth}')
in_size = [width, height]
for filter_specs in conv_filters:
if len(filter_specs) == 4:
out_depth, kernel_size, strides, padding = filter_specs
else:
out_depth, kernel_size, strides = filter_specs
padding = "same"
# Pad like in tensorflow's SAME mode.
if padding == "same":
padding_size, out_size = same_padding(in_size, kernel_size, strides)
layers.append(nn.ZeroPad2d(padding_size))
# No actual padding is performed for "valid" mode, but we will still
# compute the output size (input for the next layer).
else:
out_size = valid_padding(in_size, kernel_size, strides)
#print(f'in_depth: {in_depth} out_depth: {out_depth}, kernel_size: {kernel_size}, strides: {strides},')
layer = nn.Conv2d(in_depth, out_depth, kernel_size, strides, bias=True)
# Initialize CNN layer kernel and bias.
nn.init.xavier_uniform_(layer.weight)
nn.init.zeros_(layer.bias)
layers.append(layer)
# Activation.
layers.append(nn.ReLU())
in_size = out_size
in_depth = out_depth
#print(f'in_size: {in_size} in_depth: {in_depth}')
self._base_cnn_stack = nn.Sequential(*layers)
# Define separate logits layers for each discrete action space
if isinstance(self.action_space, gym.spaces.Tuple):
discrete_action_sizes = [
space.n if isinstance(space, gym.spaces.Discrete) else 0
for space in self.action_space.spaces
]
num_discrete_actions_1 = discrete_action_sizes[0] # 827
num_discrete_actions_2 = discrete_action_sizes[1] # 827
num_discrete_actions_3 = discrete_action_sizes[2] # 9
#print(f'num_discrete_actions_1:{num_discrete_actions_1}, num_discrete_actions_2:{num_discrete_actions_2}, num_discrete_actions_3:{num_discrete_actions_3}')
# Three separate 1×1 conv layers for different logits
self._logits1 = nn.Conv2d(in_depth, num_discrete_actions_1, kernel_size=1, stride=1, bias=True)
self._logits2 = nn.Conv2d(in_depth, num_discrete_actions_2, kernel_size=1, stride=1, bias=True)
self._logits3 = nn.Conv2d(in_depth, num_discrete_actions_3, kernel_size=1, stride=1, bias=True)
# Initialize each logits layer
for logits_layer in [self._logits1, self._logits2, self._logits3]:
nn.init.xavier_uniform_(logits_layer.weight)
nn.init.zeros_(logits_layer.bias)
# Wrap them in separate sequential blocks if padding is needed
self._logits1 = nn.Sequential(nn.ZeroPad2d(same_padding(in_size, 1, 1)[0]), self._logits1)
self._logits2 = nn.Sequential(nn.ZeroPad2d(same_padding(in_size, 1, 1)[0]), self._logits2)
self._logits3 = nn.Sequential(nn.ZeroPad2d(same_padding(in_size, 1, 1)[0]), self._logits3)
# Value function layer (remains unchanged)
self._values = nn.Linear(in_depth, 1)
normc_initializer(0.01)(self._values.weight)
@override(TorchRLModule)
def _forward(self, batch, **kwargs):
# Compute the basic 1D feature tensor (inputs to policy- and value-heads).
_, logits = self._compute_embeddings_and_logits(batch)
# Return features and logits as ACTION_DIST_INPUTS (categorical distribution).
return {
Columns.ACTION_DIST_INPUTS: logits,
}
@override(TorchRLModule)
def _forward_train(self, batch, **kwargs):
# Compute the basic 1D feature tensor (inputs to policy- and value-heads).
embeddings, logits = self._compute_embeddings_and_logits(batch)
# Return features and logits as ACTION_DIST_INPUTS (categorical distribution).
return {
Columns.ACTION_DIST_INPUTS: logits,
Columns.EMBEDDINGS: embeddings,
}
@override(TargetNetworkAPI)
def make_target_networks(self) -> None:
# Create target networks for each of the individual logit layers
self._target_base_cnn_stack = make_target_network(self._base_cnn_stack)
# Create target networks for each individual logit layer
self._target_logits1 = make_target_network(self._logits1)
self._target_logits2 = make_target_network(self._logits2)
self._target_logits3 = make_target_network(self._logits3)
@override(TargetNetworkAPI)
def get_target_network_pairs(self):
return [
# Pair for the CNN base layers
(self._base_cnn_stack, self._target_base_cnn_stack),
# Pairs for the individual logit layers
(self._logits1, self._target_logits1),
(self._logits2, self._target_logits2),
(self._logits3, self._target_logits3),
]
@override(TargetNetworkAPI)
def forward_target(self, batch, **kw):
# Normalize and permute the observations for CNN input
obs = batch[Columns.OBS].float() / 255.0 # Convert uint8 to float32 and normalize
obs = obs.permute(0, 3, 1, 2) # Change shape for CNN
# Pass the observations through the base CNN stack to get embeddings
embeddings = self._target_base_cnn_stack(obs)
#print(embeddings.shape)
# Compute the logits from each individual logit layer
logits1 = self._target_logits1(embeddings)
logits2 = self._target_logits2(embeddings)
logits3 = self._target_logits3(embeddings)
# Concatenate the logits from each layer
logits = torch.cat([logits1, logits2, logits3], dim=-1)
# Return the logits as the action distribution inputs
return {TARGET_NETWORK_ACTION_DIST_INPUTS: torch.squeeze(logits, dim=[-1, -2])}
# We implement this RLModule as a ValueFunctionAPI RLModule, so it can be used
# by value-based methods like PPO or IMPALA.
@override(ValueFunctionAPI)
def compute_values(
self,
batch: Dict[str, Any],
embeddings: Optional[Any] = None,
) -> TensorType:
# Features not provided -> We need to compute them first.
if embeddings is None:
obs = batch[Columns.OBS].float() / 255.0
embeddings = self._base_cnn_stack(obs.permute(0, 3, 1, 2))
embeddings = torch.squeeze(embeddings, dim=[-1, -2])
return self._values(embeddings).squeeze(-1)
def _compute_embeddings_and_logits(self, batch):
obs = batch[Columns.OBS].float() / 255.0 # Convert uint8 to float32 and normalize
obs = obs.permute(0, 3, 1, 2) # Change shape for CNN
embeddings = self._base_cnn_stack(obs)
# Now compute the logits for each discrete action space
logits1 = self._logits1(embeddings)
logits2 = self._logits2(embeddings)
logits3 = self._logits3(embeddings)
#print(f"logits1 shape: {logits1.shape}")
#print(f"logits2 shape: {logits2.shape}")
#print(f"logits3 shape: {logits3.shape}")
logits = torch.cat((logits1, logits2, logits3), dim=1)
#print(f"logits shape: {logits.shape}")
#print(f"embeddings shape: {embeddings.shape}")
return (
torch.squeeze(embeddings, dim=[-1, -2]),
torch.squeeze(logits, dim=[-1, -2]),
)
This is the code I have to train the agent:
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.env.single_agent_env_runner import SingleAgentEnvRunner
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from pprint import pprint
from ray.rllib.core.models.catalog import Catalog
from ray.tune.logger import JsonLoggerCallback, CSVLoggerCallback, TBXLoggerCallback
from ray.rllib.examples.envs.env_rendering_and_recording import EnvRenderCallback
env = EllipsePackingRGB()
class CustomCatalog(Catalog):
def build(self):
return {
"model": CustomCNN(
observation_space=self.observation_space,
action_space=self.action_space,
model_config=self.model_config,
)
}
spec = RLModuleSpec(
module_class=CustomCNN,
catalog_class=CustomCatalog,
inference_only=False,
observation_space=env.observation_space,
action_space=env.action_space,
model_config={
"conv_filters": [
[32, [5, 5], 4, "same"],
[64, [5, 5], 2, "same"],
[128, [3, 3], 2, "same"],
[256, [3, 3], 2, "same"],
[512, [3, 3], 2, "same"],
],
"fcnet_hiddens": [512],
"fcnet_activation": "relu",
},
)
#rl_module = spec.build()
config = (
PPOConfig()
.environment(EllipsePackingRGB)
.framework("torch")
.api_stack(
enable_rl_module_and_learner=True,
enable_env_runner_and_connector_v2=True
)
.learners(
num_learners=1,
num_gpus_per_learner=0,
num_cpus_per_learner=1,
)
.env_runners(
env_runner_cls=SingleAgentEnvRunner,
num_env_runners=1,
sample_timeout_s=50,
rollout_fragment_length=10,
num_cpus_per_env_runner = 1,
num_gpus_per_env_runner = 0,
)
.rl_module(
# We need to explicitly specify here RLModule to use and
# the catalog needed to build it.
rl_module_spec=RLModuleSpec(
module_class=CustomCNN,
catalog_class=CustomCatalog,
inference_only=False,
observation_space=env.observation_space,
action_space=env.action_space,
model_config={
"conv_filters": [
[32, [5, 5], 4, "same"],
[64, [5, 5], 4, "same"],
[128, [3, 3], 4, "same"],
[256, [3, 3], 4, "same"],
[512, [3, 3], 2, "same"],
[1024, [3, 3], 2, "same"],
],
"fcnet_hiddens": [512],
"fcnet_activation": "relu",
},
),
)
)
algo = config.build_algo()
print(algo.get_module())
mean_ppo = []
for i in range(10):
result = algo.train()
result.pop("config")
pprint(result)
print(f"Step {i}")
print(result.keys()) # Shows all available keys in the result dictionary
print(f"Episode reward mean: {result['episode_reward_mean']}")
if i % 5 == 0:
checkpoint_dir = algo.save_to_path()
print(f"Checkpoint saved in directory {checkpoint_dir}")
I have the print statements for each step in my environment so I have the following information for example, step: 4, reward: -2.738932606817179, terminated: False, truncated: False, info: {‘covered_pixels’: np.int64(22705), ‘total_pixels’: 393657, ‘fill_percentage’: np.float64(5.767711484871348)}. The termination reaches True a few times and restarts the environment, but it never completes one iteration of the training loop as it crashes, and I receive the following error.
{
"name": "OSError",
"message": "Unknown error",
"stack": "---------------------------------------------------------------------------
OSError Traceback (most recent call last)
Cell In[13], line 104
99 mean_ppo = []
102 for i in range(10):
--> 104 result = algo.train()
105 result.pop(\"config\")
106 pprint(result)
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py:331, in Trainable.train(self)
329 except Exception as e:
330 skipped = skip_exceptions(e)
--> 331 raise skipped from exception_cause(skipped)
333 assert isinstance(result, dict), \"step() needs to return a dict.\"
335 # We do not modify internal state nor update this result if duplicate.
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py:328, in Trainable.train(self)
326 start = time.time()
327 try:
--> 328 result = self.step()
329 except Exception as e:
330 skipped = skip_exceptions(e)
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py:999, in Algorithm.step(self)
994 # - No evaluation necessary, just run the next training iteration.
995 # - We have to evaluate in this training iteration, but no parallelism ->
996 # evaluate after the training iteration is entirely done.
997 else:
998 if self.config.enable_env_runner_and_connector_v2:
--> 999 train_results, train_iter_ctx = self._run_one_training_iteration()
1000 else:
1001 (
1002 train_results,
1003 train_iter_ctx,
1004 ) = self._run_one_training_iteration_old_api_stack()
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py:3350, in Algorithm._run_one_training_iteration(self)
3348 # Try to train one step.
3349 with self.metrics.log_time((TIMERS, TRAINING_STEP_TIMER)):
-> 3350 training_step_return_value = self.training_step()
3351 has_run_once = True
3353 # On the new API stack, results should NOT be returned anymore as
3354 # a dict, but purely logged through the `MetricsLogger` API. This
3355 # way, we make sure to never miss a single stats/counter/timer
3356 # when calling `self.training_step()` more than once within the same
3357 # iteration.
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\algorithms\\ppo\\ppo.py:428, in PPO.training_step(self)
426 # Perform a learner update step on the collected episodes.
427 with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
--> 428 learner_results = self.learner_group.update_from_episodes(
429 episodes=episodes,
430 timesteps={
431 NUM_ENV_STEPS_SAMPLED_LIFETIME: (
432 self.metrics.peek(
433 (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME)
434 )
435 ),
436 },
437 num_epochs=self.config.num_epochs,
438 minibatch_size=self.config.minibatch_size,
439 shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
440 )
441 self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS)
443 # Update weights - after learning on the local worker - on all remote
444 # workers.
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\core\\learner\\learner_group.py:327, in LearnerGroup.update_from_episodes(self, episodes, timesteps, async_update, return_state, num_epochs, minibatch_size, shuffle_batch_per_epoch, **kwargs)
272 def update_from_episodes(
273 self,
274 episodes: List[EpisodeType],
(...)
283 **kwargs,
284 ) -> Union[Dict[str, Any], List[Dict[str, Any]], List[List[Dict[str, Any]]]]:
285 \"\"\"Performs gradient based update(s) on the Learner(s), based on given episodes.
286
287 Args:
(...)
325 call to async_update that is ready.
326 \"\"\"
--> 327 return self._update(
328 episodes=episodes,
329 timesteps=timesteps,
330 async_update=async_update,
331 return_state=return_state,
332 num_epochs=num_epochs,
333 minibatch_size=minibatch_size,
334 shuffle_batch_per_epoch=shuffle_batch_per_epoch,
335 **kwargs,
336 )
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\core\\learner\\learner_group.py:602, in LearnerGroup._update(self, batch, episodes, timesteps, async_update, return_state, num_epochs, num_iters, minibatch_size, shuffle_batch_per_epoch, **kwargs)
598 results = self._get_async_results(tags_to_get)
600 else:
601 results = self._get_results(
--> 602 self._worker_manager.foreach_actor(partials)
603 )
605 return results
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\utils\\actor_manager.py:450, in FaultTolerantActorManager.foreach_actor(self, func, healthy_only, remote_actor_ids, timeout_seconds, return_obj_refs, mark_healthy)
445 func, remote_actor_ids = self._filter_func_and_remote_actor_id_by_state(
446 func, remote_actor_ids
447 )
449 # Send out remote requests.
--> 450 remote_calls = self._call_actors(
451 func=func,
452 remote_actor_ids=remote_actor_ids,
453 )
455 # Collect remote request results (if available given timeout and/or errors).
456 _, remote_results = self._fetch_result(
457 remote_actor_ids=remote_actor_ids,
458 remote_calls=remote_calls,
(...)
462 mark_healthy=mark_healthy,
463 )
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\rllib\\utils\\actor_manager.py:734, in FaultTolerantActorManager._call_actors(self, func, remote_actor_ids)
730 remote_actor_ids = self.actor_ids()
732 if isinstance(func, list):
733 calls = [
--> 734 self._actors[i].apply.remote(f) for i, f in zip(remote_actor_ids, func)
735 ]
736 else:
737 calls = [self._actors[i].apply.remote(func) for i in remote_actor_ids]
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\actor.py:206, in ActorMethod.remote(self, *args, **kwargs)
205 def remote(self, *args, **kwargs):
--> 206 return self._remote(args, kwargs)
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\_private\\auto_init_hook.py:21, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
18 @wraps(fn)
19 def auto_init_wrapper(*args, **kwargs):
20 auto_init_ray()
---> 21 return fn(*args, **kwargs)
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\util\\tracing\\tracing_helper.py:422, in _tracing_actor_method_invocation.<locals>._start_span(self, args, kwargs, *_args, **_kwargs)
420 if kwargs is not None:
421 assert \"_ray_trace_ctx\" not in kwargs
--> 422 return method(self, args, kwargs, *_args, **_kwargs)
424 class_name = (
425 self._actor_ref()._ray_actor_creation_function_descriptor.class_name
426 )
427 method_name = self._method_name
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\actor.py:366, in ActorMethod._remote(self, args, kwargs, name, num_returns, max_task_retries, retry_exceptions, concurrency_group, _generator_backpressure_num_objects, enable_task_events)
363 if self._decorator is not None:
364 invocation = self._decorator(invocation)
--> 366 return invocation(args, kwargs)
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\actor.py:347, in ActorMethod._remote.<locals>.invocation(args, kwargs)
344 if actor is None:
345 raise RuntimeError(\"Lost reference to actor\")
--> 347 return actor._actor_method_call(
348 self._method_name,
349 args=args,
350 kwargs=kwargs,
351 name=name,
352 num_returns=num_returns,
353 max_task_retries=max_task_retries,
354 retry_exceptions=retry_exceptions,
355 concurrency_group_name=concurrency_group,
356 generator_backpressure_num_objects=(
357 _generator_backpressure_num_objects
358 ),
359 enable_task_events=enable_task_events,
360 )
File c:\\Users\\**\\AppData\\Local\\anaconda3\\envs\\myenvrl\\Lib\\site-packages\\ray\\actor.py:1503, in ActorHandle._actor_method_call(self, method_name, args, kwargs, name, num_returns, max_task_retries, retry_exceptions, concurrency_group_name, generator_backpressure_num_objects, enable_task_events)
1500 if generator_backpressure_num_objects is None:
1501 generator_backpressure_num_objects = -1
-> 1503 object_refs = worker.core_worker.submit_actor_task(
1504 self._ray_actor_language,
1505 self._ray_actor_id,
1506 function_descriptor,
1507 list_args,
1508 name,
1509 num_returns,
1510 max_task_retries,
1511 retry_exceptions,
1512 retry_exception_allowlist,
1513 self._ray_actor_method_cpus,
1514 concurrency_group_name if concurrency_group_name is not None else b\"\",
1515 generator_backpressure_num_objects,
1516 enable_task_events,
1517 )
1519 if num_returns == STREAMING_GENERATOR_RETURN:
1520 # Streaming generator will return a single ref
1521 # that is for the generator task.
1522 assert len(object_refs) == 1
File python\\ray\\_raylet.pyx:3972, in ray._raylet.CoreWorker.submit_actor_task()
File python\\ray\\_raylet.pyx:3977, in ray._raylet.CoreWorker.submit_actor_task()
File python\\ray\\_raylet.pyx:859, in ray._raylet.prepare_args_and_increment_put_refs()
File python\\ray\\_raylet.pyx:850, in ray._raylet.prepare_args_and_increment_put_refs()
File python\\ray\\_raylet.pyx:947, in ray._raylet.prepare_args_internal()
File python\\ray\\_raylet.pyx:3432, in ray._raylet.CoreWorker.put_serialized_object_and_increment_local_ref()
File python\\ray\\_raylet.pyx:3234, in ray._raylet.CoreWorker._create_put_buffer()
File python\\ray\\includes/common.pxi:93, in ray._raylet.check_status()
OSError: Unknown error"
}
Any help or guidance would be much appreciated!