Understanding ray actor

Hi, I have a question about accessing attribute on ray actors. In order to access an attribute on an actor, you need to create a method for that like that get_counter in the following example.

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

# Create an actor from this class.
counter = Counter.remote()

However my use case is this, I want to get an attribute on an arg that is passed to my class, any idea why this is not working?

# my_module.py
@ray.remote
class MyClass:
    def __init__(self, dependent: DependentClass):
        self.dependent = dependent
    def update_dependent(self):
        self.dependent.attribute = 5
    def get_dependent_attribute(self):
        return self.dependent.attribute

And in the my driver script:

from my_module import MyClass, DependentClass
my_remote_class = MyClass(dependent=DependentClass())

result_ref = my_remote_class.update_dependent.remote()
ray.get(result_ref)

attribute_ref = my_remote_class.get_dependent_attribute.remote()
attribute = ray.get(attribute_ref)

print(attribute)  # The attribute is not updated 

@Y_C

try this:


class Dependent():
    def __init__(self):
        self.attribute = 0

@ray.remote
class Actor:
    def __init__(self, dependent: Dependent):
        self.dependent = dependent
    def update_dependent(self):
        self.dependent.attribute = 5
    def get_dependent_attribute(self):
        return self.dependent.attribute

if __name__ == "__main__":
    cls = Dependent()
    actor = Actor.remote(cls)
    actor.update_dependent.remote()
    print(ray.get(actor.get_dependent_attribute.remote()))
2023-06-16 09:12:58,152 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
5

Actors have to be instantiated with Class_name.remote(), which was missing in your code.

Alternatively, you could send an argument to be update with:

import ray

class Dependent():
    def __init__(self):
        self.attribute = 0

@ray.remote
class Actor:
    def __init__(self, dependent: Dependent):
        self.dependent = dependent
    def update_dependent(self, n):
        self.dependent.attribute = self.dependent.attribute + n
    def get_dependent_attribute(self):
        return self.dependent.attribute

if __name__ == "__main__":
    cls = Dependent()
    actor = Actor.remote(cls)
    for i in range(5):
        actor.update_dependent.remote(i)
        print(ray.get(actor.get_dependent_attribute.remote()))
2023-06-16 09:32:12,314 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
0
1
3
6
10

HTH
Jules

@Y_C Just curious, what’s the use case that requires a dependent class keep track of an attribute. Actors are designed to be stateful, and as such they can be written to provide a stateful service.

HTH
Cheers
Jules

Thanks for your reply, actually I made a typo in the question, missed the .remote() part, in my script it actually exists.

And I found out that my problem turns out to be completely something else. In the driver script I have something to write to google cloud storage, in the ray log, I can confirm that the metadata is a valid dict however the file that it’s written is empty. If I execute these lines manually in ray head node by ssh into that pod, it would write the correct data. The script was ran by submitting a ray job

    with smart_open(metadata_write_path, 'w') as f:
        json.dump(metadata, f)
        print(f'metadata: {returned_metadata} written to: {metadata_write_path}')

My use case is an ML platform where we have a unified interface for the users when running training locally and in the cloud. We swop the dependency classes when running in the cloud, while when running locally, users can use their own dependencies, i.e. instead of loading data from the cloud, they can load from a local file etc.

My driver script is something like this:

def run_remote(data_load_path: str, model_load_path: Optional[str],
               model_write_path: str,
               metadata_write_path: str, num_cpu: int, args: tuple[str]) -> None:

    if not ray.is_initialized():
        ray.init()

    RemoteTrainingPipeline = ray.remote(TrainingPipeline)
    remote_training_instance = RemoteTrainingPipeline.options(num_cpus=3, max_task_retries=3, max_restarts=3).remote(
        data_loader=CloudDataLoader(),
        trainer=CustomTrainer(kwargs),
        model_loader=CloudModelLoader(),
        writer=CloudModelWriter())
    training_result = remote_training_instance.start_training.remote(data_load_path=data_load_path,
                                                                     model_load_path=model_load_path,
                                                                     model_write_path=model_write_path)

    metadata = ray.get(training_result)  # This is a blocking call to wait for the actor method to complete
    print(f'ray driver: metadata: {metadata}')
    with smart_open(metadata_write_path, 'w') as f:
        json.dump(metadata, f)
        print(f'metadata: {returned_metadata} written to: {metadata_write_path}')

if __name__ == "__main__":
    run_remote(...)

@Y_C From the code included, it’s hard to discern what the actor RemoteTrainingPipeline is doing and the order, if any, data_loader, trainer, model_loader, and writer are doing. And how is the resulting metadata dictionary is created or populated and returned in start_training.remote.

Here is an example that shows the attributes being changed. I tried to simulate your use-case for local mode.

import ray

class DataLoader():
    def __init__(self, val):
        self.attribute = val

class ModelLoader():
    def __init__(self, path):
        self.attribute = path

class Trainer():
    def __init__(self, **kwargs):
        self.attribute = kwargs
       
    def update_trainer_state(self, flag):
        self.attribute["status"] = flag



@ray.remote
class Actor:
    def __init__(self, dloader, mloader, trainer):
        self.dloader = dloader
        self.trainer = trainer
        self.mloader = mloader

    def update_trainer(self, status):
        self.trainer.update_trainer_state(status)

    def get_attributes(self):
        results = {"trainer": self.trainer.attribute,
                   "loader":  self.dloader.attribute,
                   "mloader": self.mloader.attribute     
        }
                   
        return results

if __name__ == "__main__":
    model_cls = ModelLoader("/model/v1")
    data_cls  = DataLoader("pytorch")
    train_cls = Trainer(status="START")
    actor = Actor.remote(data_cls, model_cls, train_cls)

    results = ray.get(actor.get_attributes.remote())
    print(results)

    # change status 
    actor.update_trainer.remote("DONE")

    results = ray.get(actor.get_attributes.remote())
    print(results)
2023-06-16 13:37:24,772 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
{'trainer': {'status': 'START'}, 'loader': 'pytorch', 'mloader': '/model/v1'}
{'trainer': {'status': 'DONE'}, 'loader': 'pytorch', 'mloader': '/model/v1'}

Thanks so much for the reply. I wasn’t very clear in the previous comment. Actually my original question was never an issue, I mistaken it as the root cause of an empty file. After adding extensive logging, I found out that the attribute on the dependent class was updated with no problem. I thought it was not updated because when I wrote that attribute to a file on google cloud storage, it was empty. Apparently there was some issue there. I still couldn’t figure it out.

Actually I couldn’t find any ways to easily move files from head node to the machine which submitted the ray job. It would have helped me a lot if there is an easy way to do this. Or even adding a metadata to the ray_job which the job submitter can get would be extremely useful.

The only resource I could find is this: But it turns out that it’s not for kubernetes but only for VMs.

ray rsync_down cluster.yaml ‘/path/on/cluster’ ‘/local/path’
ray rsync_up cluster.yaml ‘/local/path’ ‘/path/on/cluster’

Do you have any recommendation on this? Thanks.

@Jules_Damji do you mind have a look at my other related question? Add metadata to a ray job. It’s about metadata of a ray job. Thanks so much!