My Ray programs stops learning when using distributed compute

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Dear all,

I’m trying out Ray and RLlib. I think it is a very promising and powerful library. I like it very much!
I created a custom environment, a custom model and even a custom exploration class.
I also have a callback which prints something on every on_sample_end().

They seem to work fine as my program runs fine and I can see it learning reaching the maximum score given enough training iterations.
The rough simplified program is shown below, including the exact config I use (it works using that config)

The next step is trying to distribute the compute over multiple workers (my reason for getting familiar with Ray). For this I add the following ‘distribute compute config’ to the script:

# === Settings for Rollout Worker processes ===
config_simple["num_workers"] = 2
config_simple["num_envs_per_worker"] = 1
config_simple["create_env_on_driver"] = False
config_simple["rollout_fragment_length"] = 200
config_simple["batch_mode"] = "complete_episodes"
config_simple["train_batch_size"] = 400

# === Resource Settings ===
config_simple["num_gpus"] = 0  # For driver, later I will use GPU.
config_simple["num_cpus_per_worker"] = 1
config_simple["num_gpus_per_worker"] = 0

I cannot get this to work. I have two problems which block me from proceeding further.

I can see my model trains by the print() statement in the training iteration loop.
However, when I evaluate the model, it always produces the exact same (bad) results. It looks like it produces the results when evaluating with a completely untrained agent.
If I do many many more training iterations, the result start to change a bit, but still the performance is much much worse than the same number of training iterations without the distribute compute config above.
In the same amount of time, it never learns the maximum score, but in this time, much more CPU-clock cycles have been used.

Clearly I’m doing/understanding something wrong. Can someone help figuring out what?

The print from my callback always appears about 5 times.
In my config I have: config_simple["rollout_fragment_length"] = 200 and config_simple["train_batch_size"] = 400 so i would expect only 2 rollouts would be enough.
Why are there more than 2?
When I run this without the the distribute compute config above, I get many, many prints per iteration (200 or 400), I expected 1 since 200 should be the default value for both these configs. The on_learn_on_batch() call back happens once per iteration as expected in both cases.
So again, clearly I misunderstand something, can someone enlighten me? thank you! :slight_smile:
(This is of less imprtance than the first point, but maybe these observations are related somehow.)

_
My program follows these lines, this program runs fine and produces results as expected until the above config is added, then results become unexpected (I expected not much change).
(I hope this is enough information and I don’t have to post all of my custom model/environment/exploration classes. They seem to work fine when the above config is not mentioned.):

# IMPORTS
... # All the imports


# EXPERIMENT CONFIG
n_iter = 5  # Number of training iterations.


# RAY CONFIG
config_simple = DEFAULT_CONFIG.copy()   # from ray.rllib.agents.dqn.simple_q import DEFAULT_CONFIG

config_simple['seed'] = 1
config_simple["model"] = {"custom_model": "my_custom_model_v0",
                          "custom_model_config": {"config_arg_1": some_value,
                                                  "config_arg_2": some_value}}
												  
# === Environment Settings ===
config_simple["env"] = "stack-v0"

# === Debug Settings ===
config_simple["log_level"] = "WARN"

# === Deep Learning Framework Settings ===
config_simple["framework"] = "torch"

# === Exploration Settings ===
config_simple["explore"] = True
config_simple["exploration_config"] = {  # The Exploration class to use
    "type": MyCustomExploration,
    # Config for the Exploration class' constructor:
    "initial_epsilon": 1.0,
    "final_epsilon": 0.02,
    "epsilon_timesteps": 5000,
}

# === API deprecations/simplifications/changes ===
config_simple["_disable_preprocessor_api"] = True

# === Evaluation settings ===
config_simple["evaluation_interval"] = n_iter
config_simple["evaluation_duration"] = 1

# === Callback settings ===
config_simple["callbacks"] = MyCallback


# START RAY
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus, ignore_reinit_error=True)


# REGISTER STUFF
# environment
register_env("my_custom_env-v0", lambda config: MyCustomEnvironment())

# model
ModelCatalog.register_custom_model("my_custom_model_v0", MyCustomModelV0)


# CREATE AGENT
agent = SimpleQTrainer(config=config_simple,
                       env="my_custom_env-v0")


# TRAIN
status = "{:2d} reward {:6.2f} ; {:6.2f} ; {:6.2f} len {:4.2f}"
for n in range(n_iter):
	result = agent.train()
	print(status.format(n + 1, result["episode_reward_min"], result["episode_reward_mean"], result["episode_reward_max"], result["episode_len_mean"]))
agent.stop()


# EVALUATE
agent.evaluate()
env = MyCustomEnvironment()
sum_reward = 0

# Run episode
state = env.reset()
while True:
	action = agent.compute_single_action(state, explore=explore)
	state, reward, done, info = env.step(action)
	sum_reward += reward
	if done:
		break


# SHOW OUTPUT	
print(sum_reward)

Hi @Stefan-1313,

If you print the results of agent.evaluate() do they look similar to the training results? You probably also want to run more than one episode in evaluation.

@Stefan-1313,

Another thing to be aware of that is not immediately obvious is that the iterations are measured in new steps sampled. So if you double the workers then you will collect twice as many samples per iteration but you will have half as many updates per iteration so to train for the same number of policy updates you need to either double the number of iterations trained. Or double timesteps_per_iteration in your config.

1 Like

Thank you mannyv! Your comment made me look at the config once again.

I’ve read this: Training APIs — Ray 1.13.0
It seems that the default values mentioned there are incorrect. When looking at the source code I found the actual default values are different.

So I changed my ‘distributed compute config’ to this:

# === Settings for Rollout Worker processes ===
config_simple["num_workers"] = 0
config_simple["num_envs_per_worker"] = 1
config_simple["create_env_on_driver"] = False
config_simple["rollout_fragment_length"] = 4
config_simple["truncate_episodes"] = "complete_episodes"
config_simple["train_batch_size"] = 32
config_simple["timesteps_per_iteration"] = 1000

# === Resource Settings ===
config_simple["num_gpus"] = 0  # For driver, later I will use GPU.
config_simple["num_cpus_per_worker"] = 1
config_simple["num_gpus_per_worker"] = 0

And with it I can reproduce the same results as without the distribute compute config. However, the num_workers=0 so still no parallelisation.

I hope you don’t mind me asking to clearify a couple of things.

You suggest multiplying my timesteps_per_iteration with the number of workers. However, then I the total workload increases with the number of workers. So the parallelized compute bennefits are gone.
How to keep the number of updates per iteration equal while also keeping the total number of timesteps (about) equal? Deviding the rollout_fragment_length by the number of workers?

If deviding the rollout_fragment_length by the number of workers is indeed correct, then I can only have a maximum of 4 workers.
My machine can run about 12 workers simulatniously. Is there another parameter to adjust to keep the number of updates per iteration equal while also keeping the total number of timesteps (about) equal?

What is the difference between train_batch_size and replay_batch_size? I did not explicilty configure a replay buffer, but I expect the simple Q learner to use one. Is the replay_batch_size then not the same as train_batch_size?
Would simply doubling the train_batch_size (and/or replay_batch_size) when the workers double be sufficient? I get worse performanse when I try this.

Do any of my suggestions above (like lowering rollout_fragment_length) hurt the expected computational performance/efficiency?

I tried these things and I manage to get steady training performance. However, I’d like to increase my understanding the parameters a little more.

@Stefan-1313 , there is some worthful discussion with @mannyv about this in another thread. This might help you understanding a little better.

@Lars_Simon_Zehnder Thank you! I would very much like to read more about this.
I tried to read it but the link seems to be broken? Perhaps you made a mistake :slightly_smiling_face:, mind checking it?

@Stefan-1313 good catch! I corrected the link

@Lars_Simon_Zehnder thank you for the link. I may have forgotten about it.

@Stefan-1313 one difference between that description and your configuration is you have set config_simple["batch_mode"] ="complete_episodes".

With that setting the rollout worker will collect an entire episode which means that rollout_fragment_length expresses a lower bound on the number of samples collected. For example if your rollout_fragment_length is 4 but the episode lasts 20 steps then you will get 20 transitions added to the replay buffer not 4. On the other hand if the first episode lasted only one step then then the rollout worker would run at least on more episode before it returned samples.

Is there a reason you need to roll out complete episodes?

1 Like

Hi Mannyv,

Lars already thanked you for your extremely detailed answer here: Read Tune console output from Simple Q - #5 by mannyv But I would like to thank you again. It is extremely clearifying. Funny how I struggly with almost the same concepts as @Lars_Simon_Zehnder :).

Regarding the complete_episodes.
There is no specific reason I need this. I was afraid the agent would only learn the first rollout_fragment_length steps perfectly and can’t see further in the episodes.
Such an implementation off course would make little sense.

Is this how it works: After a worker has sampled the rollout_fragment_length steps, and start a new fragment, will it continue the episode where it stopped the previous fragment? So in this example at the second iteration, will it start/continue at step 5? Then I can just as well use truncate_episodes.

So I tried using config_simple[“batch_mode”] =“truncate_episodes”. I can confirm this even improves the learning for my case (quite a lot)! I don’t exactly understand why it makes such a difference though. Do you have any thoughts on it?

Hi @mannyv,

Although all the other questions are of interest, the most important question I still have right now is:
Is it possible to increase the number of policy updates without increasing steps collected?

Because increasing the workers increases sample collection (which is great). But it decreases the policy updates which is a problem. The solution to collect twice as many samples voids the gain of faster sample collection. You also don’t (or barely) profit from the additional samples collected.

I cannot find such a parameter, but I look for a parameter like num_train_batches_after_sample_collection. This parameter simply controls the number of policy updates when a policy update is performed.

This hypothetical parameter now seems to be always 1 but if it could be increased with the number of workers, it is then possible to keep the policy updates and steps collected ratio constant. Then, it is much simpler to leverage the parallel processing advantages.

But maybe you know another way :slight_smile: .