What happened + What you expected to happen
I trained a RL agent using Ray Tune on my local PC, and all the results, including checkpoints, are saved in a directory named “RL_LTR,” as shown below:
Next, I uploaded this directory to Colab and attempted to restore the agent using the tune.Tuner.restore
method. However, when I tried to test my agent, I encountered the following error message:
Path not found: (<pyarrow._fs.LocalFileSystem object at 0x7f22d0608e30>, /mainfs/scratch/sb5e19/RL_LTR/TD3_TRAIN/TD3_TRAIN/TD3_RankingEnv_ce7f8b78_1_AlgorithmConfig__prior_exploration_config=None,disable_action_flattening=False,disable_execution_plan_ap_2023-10-19_18-22-51/checkpoint_000019)
It appears that I cannot access the checkpoints from a system other than the one where the model was trained.
Reproduction script
class DRLlibv2:
def __init__(
self,
trainable: str | Any,
params: dict,
train_env=None,
run_name: str = "tune_run",
local_dir: str = "tune_results",
search_alg=None,
concurrent_trials: int = 0,
num_samples: int = 0,
scheduler_=None,
# num_cpus: float | int = 2,
dataframe_save: str = "tune.csv",
metric: str = "episode_reward_mean",
mode: str | list[str] = "max",
max_failures: int = 0,
training_iterations: int = 100,
checkpoint_num_to_keep: None | int = None,
checkpoint_freq: int = 0,
reuse_actors: bool = True
):
self.params = params
# if train_env is not None:
# register_env(self.params['env'], lambda env_config: train_env(env_config))
self.train_env = train_env
self.run_name = run_name
self.local_dir = local_dir
self.search_alg = search_alg
if concurrent_trials != 0:
self.search_alg = ConcurrencyLimiter(
self.search_alg, max_concurrent=concurrent_trials
)
self.scheduler_ = scheduler_
self.num_samples = num_samples
self.trainable = trainable
if isinstance(self.trainable, str):
self.trainable = self.trainable.upper()
# self.num_cpus = num_cpus
self.dataframe_save = dataframe_save
self.metric = metric
self.mode = mode
self.max_failures = max_failures
self.training_iterations = training_iterations
self.checkpoint_freq = checkpoint_freq
self.checkpoint_num_to_keep = checkpoint_num_to_keep
self.reuse_actors = reuse_actors
def train_tune_model(self):
"""
Tuning and training the model
Returns the results object
"""
# if ray.is_initialized():
# ray.shutdown()
# ray.init(num_cpus=self.num_cpus, num_gpus=self.params['num_gpus'], ignore_reinit_error=True)
if self.train_env is not None:
register_env(self.params['env'], lambda env_config: self.train_env)
tuner = tune.Tuner(
self.trainable,
param_space=self.params,
tune_config=TuneConfig(
search_alg=self.search_alg,
scheduler=self.scheduler_,
num_samples=self.num_samples,
# metric=self.metric,
# mode=self.mode,
**({'metric': self.metric, 'mode': self.mode} if self.scheduler_ is None else {}),
reuse_actors=self.reuse_actors,
),
run_config=RunConfig(
name=self.run_name,
storage_path=self.local_dir,
failure_config=FailureConfig(
max_failures=self.max_failures, fail_fast=False
),
stop={"training_iteration": self.training_iterations},
checkpoint_config=CheckpointConfig(
num_to_keep=self.checkpoint_num_to_keep,
checkpoint_score_attribute=self.metric,
checkpoint_score_order=self.mode,
checkpoint_frequency=self.checkpoint_freq,
checkpoint_at_end=True,
),
verbose=3,#Verbosity mode. 0 = silent, 1 = default, 2 = verbose, 3 = detailed
),
)
self.results = tuner.fit()
if self.search_alg is not None:
self.search_alg.save_to_dir(self.local_dir)
# ray.shutdown()
return self.results
def infer_results(self, to_dataframe: str = None, mode: str = "a"):
"""
Get tune results in a dataframe and best results object
"""
results_df = self.results.get_dataframe()
if to_dataframe is None:
to_dataframe = self.dataframe_save
results_df.to_csv(to_dataframe, mode=mode)
best_result = self.results.get_best_result()
# best_result = self.results.get_best_result()
# best_metric = best_result.metrics
# best_checkpoint = best_result.checkpoint
# best_trial_dir = best_result.log_dir
# results_df = self.results.get_dataframe()
return results_df, best_result
def restore_agent(
self,
checkpoint_path: str = "",
restore_search: bool = False,
resume_unfinished: bool = True,
resume_errored: bool = False,
restart_errored: bool = False,
):
"""
Restore errored or stopped trials
"""
# if restore_search:
# self.search_alg = self.search_alg.restore_from_dir(self.local_dir)
if checkpoint_path == "":
checkpoint_path = self.results.get_best_result().checkpoint._local_path
restored_agent = tune.Tuner.restore(
checkpoint_path, trainable = self.trainable,
param_space=self.params,
restart_errored=restart_errored,
resume_unfinished=resume_unfinished,
resume_errored=resume_errored,
)
print(restored_agent)
self.results = restored_agent.get_results()
if self.search_alg is not None:
self.search_alg.save_to_dir(self.local_dir)
return self.results
def get_test_agent(self, test_env_name: str=None, test_env=None, checkpoint=None):
"""
Get test agent
"""
# if test_env is not None:
# register_env(test_env_name, lambda config: [test_env])
if checkpoint is None:
checkpoint = self.results.get_best_result().checkpoint
testing_agent = Algorithm.from_checkpoint(checkpoint)
# testing_agent.config['env'] = test_env_name
return testing_agent
local_dir = Path.cwd()/"TD3_TRAIN"
drl_agent = DRLlibv2(
trainable="TD3",
# train_env = RankingEnv,
# num_cpus = num_cpus,
run_name = "TD3_TRAIN",
local_dir = local_dir,
params = train_config.to_dict(),
num_samples = 1,#Number of samples of hyperparameters config to run
# training_iterations=5,
checkpoint_freq=5,
# scheduler_=scheduler_,
search_alg=search_alg,
metric = "episode_reward_mean",
mode = "max"
# callbacks=[wandb_callback]
)
results = drl_agent.restore_agent((local_dir/"TD3_TRAIN").as_posix())
test_agent = drl_agent.get_test_agent()