How to read and update global array from within async workers and learner

Hi, I have an asynchronous programming related question (using Ray, off course).

Is it possible to have a global array (ie: theta_params with 3 float values) that get passed to individual async workers and update these params from the learner in an IMPALA setup?

In short, workers need to read up to date theta_params to act in some environment, and the learner need to update these theta_params using the async worker experiences.

BUT what it seems from debugging is that besides I’m passing a reference to this array, the async workers read always the initial theta_params (all zeroes), besides the learner updating it frequently (printing it show actual values being updated).

So it seems as if each async worker and learner have different instances of these theta_params, besides I’m passing a reference from the beginning.

Is this because these are asynchronous processes and it is their expected behaviour? Or should I search for something else?

And in case is the first one, how can I achieve this “reading and updating” over a single array?

Hi LecJack,

Do you instantiate the theta_params obect as a remote object and pass the reference?
Do you pass that object into your config to make it accessible by the workers?
If you do this correctly, this should not only work between asynchronous processes, but also between different nodes on a ray cluster.

Do you have some code to paste?

Cheers

Hi arturn, thanks for your reply.

I took some days to try what you suggested, but still couldn’t make the updating process work.

I can share some code structure as an example, but let me know if it’s sufficient or not clear:

The training process starts calling train_rl(). The thetas parameters are the one that need to be shared and updated. I think the problem is in the updating process.

def train_rl():
    thetas = torch.zeros(2)
    thetas = ray.put(thetas)

    config = { ...
               "env_config": {"theta_params": thetas}
                "model": { "custom_model": "SomeCustomModel",
                           "custom_model_config": {'theta_params': thetas} 
                         } 
    }
    results = tune.run(ImpalaCustomTrainer,
                       config=config,
                       ...
                       )

The ImpalaCustomTrainer is defined just to modify the vtrace_torch_policy.py file, where I do some operations using sampled observations and other data inside build_vtrace_loss() function.

From inside this function, I would like to modify or write over the original thetas, so what I do is take the original reference, and try to overwrite it with the newly computed theta parameters.

Most probably, this piece of code is the wrong one, when trying to overwrite the original thetas reference:

def build_vtrace_loss(policy, model, dist_class, train_batch):
    ...
    # Get old theta values
    thetas = ray.get(policy.config['model']['custom_model_config']['theta_params'])

    # Do some computing to get updated theta parameters

    # Update the original theta reference to a new one
    # Note: This is probably wrongly done!
    policy.config['model']['custom_model_config']['theta_params'] = ray.put(thetas)

Printing policy.config['model']['custom_model_config']['theta_params'] just after that last line of code, shows how the value changes and gets updated.

But printing the theta parameters from inside the neural network model shows the same at every time step.

Is there a way to actually do this update over the original thetas values? Maybe a different way of structure the code?

Hi,

Sorry for the long delay.
This does not work for two reasons:

  • Objects that you ray.put are immutable.
  • Secondly, build_vtrace_loss changes its local copy of the policy.config dictionary, which holds the object ID of theta_params. This will not change the object ID in other scopes.

I am sorry, I have sent you in the wrong direction. What I have done in the past in a similar case is instantiate an actor:

@ray.remote
class RemoteThetaDummy():
    def __init__(self):
        self.theta = 2
    def set_theta(theta):
        self.theta = theta
    def get_theta():
        return self.theta

config['model']['custom_model_config']['theta_dummy_handle']  = RemoteThetaDummy.remote()

Does this work for you? The code is not tested

Cheers

2 Likes

Hi @arturn,

No worries about the delay! I’m really thankful you’re helping me with this! (and also full of joy because it is working now :slight_smile: )

I have a question about your last line of code: shouldn’t that be

config['model']['custom_model_config']['theta_dummy_handle'].set_theta.remote(new_theta)

?

So that when build_vtrace_loss changes its local copy of the policy.config dictionary, I still have the reference to the original RemoteThetaDummy class (actor) and update it’s value without making any copy.

It is working for me now, so thank you very much for your valuable responses @arturn!

Just for the record, here is my particular implementation that maybe is useful to other people with similar needs:

At train.py, outside any other function or class:

@ray.remote
class RemoteThetaDummy:
    def __init__(self):
        self.thetas = torch.zeros(2) # A 2-d tensor in my case

    def set_thetas(self, new_thetas):
        print("Setting new thetas...")
        print("Old thetas:", self.thetas) # To test if previous update really worked or keep being zeros
        self.thetas = new_thetas
        print("New thetas updated to:", self.thetas)

    def get_thetas(self):
        return self.thetas

Before creating the config dictionary to pass to tune.run, I instantiate that class:

thetas = RemoteThetaDummy.remote()

And as in my second comment on this post, I pass that reference to the NN model and environment config:

def train_rl():
    thetas = RemoteThetaDummy.remote()

    config = { ...
               "env_config": {"theta_params": thetas}
                "model": { "custom_model": "SomeCustomModel",
                           "custom_model_config": {'theta_params': thetas} 
                         } 
    }
    results = tune.run(ImpalaCustomTrainer,
                       config=config,
                       ...
                       )

Now, to read those parameters from within the Neural Network model:

At init:

self.thetas = theta_params 

At forward_rnn()

thetas = ray.get(self.thetas.get_thetas.remote())
print("NN thetas:", thetas) # to test if up to date theta parameters

And finally, inside build_vtrace_loss() at vtrace_torch_policy.py

def build_vtrace_loss(policy, model, dist_class, train_batch):
    ...
    # Get old theta values
    thetas = ray.get(policy.config['model']['custom_model_config']['theta_params'].get_thetas.remote())

    # Do some computing to get updated theta parameters
    thetas = thetas + 42

    # Update the original theta class instance (actor)
    policy.config['model']['custom_model_config']['theta_params'].set_thetas.remote(thetas)
    
    # To test if updated parameters correspond to local ones
    print("Local/global thetas:", thetas, ray.get(policy.config['model']['custom_model_config']['theta_params'].get_thetas.remote())) 

Hi,

Glad I solved your issue :slight_smile:

The last line of code was meant to be what goes into your main executable together with the rest before you hand the config to tune or the Trainer. But since it is working, I suppose you have already done something similar!

1 Like