Is it possible to load files from the Actors Instead of the Head

When I use f=open(“path_to_file”, “r”) within the function decorated by “@ray.remote”, it opens a the file on the head machine. Is there a way where I can make it point to locations on the actors?

In my case working in the kubernetes environment, where each worker node has its own /home storage, I solved the problem by the following code (example for saving/loading numpy arrays, just extend for the file type you want read):

# Load packages
import ray
import os
import numpy as np

# Constants:
MAX_TIME_OUT = 1000

# Init ray cluster:
if os.path.isdir('/home/ray'):
    ray.init(
        address='auto',
        ignore_reinit_error=True,
    )
else:
    ray.init(
        num_cpus = os.cpu_count()-1,
        ignore_reinit_error=True,
    )

## Get info about the node:
class NodeInfo(object):
    def __init__(self):
        import time
        time.sleep(0.01)
    def create_path(self, path):
        """
        Create a path in the file system
        """
        import pathlib
        p = pathlib.Path(path)
        p.mkdir(parents=True, exist_ok=True)
    def ip(self):
        """
        Get the IP address 
        """        
        import socket
        return socket.gethostbyname(
            socket.gethostname()
        )
    def free_store(self, path):
        """
        Check the free storage in a given home path
        """
        import numpy as np
        import os
        statvfs = os.statvfs(path)
        return np.round(
            statvfs.f_frsize * statvfs.f_bavail / 1024**3,
            2,
        )
    def set_user_permission(self):
        """
        Transform the /home into user permission
        """
        import subprocess
        subprocess.call(['sudo', 'chmod', '+x', '/home'])
        subprocess.call(['sudo', 'chmod', 'ugo+rwx', '/home'])
    def save_numpy_array(self, data, file_path, dtype = np.float32):
        """
        Write in disk a numpy array
        """
        self.set_user_permission()
        self.create_path(
            os.path.dirname(
                file_path
            )
        )
        with open(file_path, 'wb') as f:
            np.savez_compressed(
                f,
                data=data.astype(dtype),
            )
    def load_numpy_array(self, file_path):
        """
        Load numpy array from a worker node
        """
        with open(file_path, 'rb') as f:
            data = np.load(f)['data']
        return data        
    def ls(self, path):
        """
        List files in a given path
        
        Parameters
        ----------
        path (str): 
           Complete path to glob
        """
        import os
        import glob
        return glob.glob(
            os.path.join(
                path,
                '*'
            )
        )
    def rm_rf(self, path):
        """
        Remove all folders and files under path, and path as well
        """
        import shutil
        import pathlib
        import os
        path = pathlib.Path(path)
        try:
            shutil.rmtree(path)
        except:
            pass
        
# Stack duplicate actors:
actors_duplicated_lst = []
ip_duplicated_lst = []
for i in range(MAX_TIME_OUT):
    actors_duplicated_lst.append(
        ray.remote(NodeInfo).remote()
    )
    ip_duplicated_lst.append(
        actors_duplicated_lst[-1].ip.remote()
    )

# Kill duplicated actors:
ip_lst = []
actors_lst = []
ip_duplicated_lst = ray.get(ip_duplicated_lst)
for i, ip in enumerate(ip_duplicated_lst):
    if ip not in ip_lst:
        actors_lst.append(
            actors_duplicated_lst[i]
        )
        ip_lst.append(
            ip
        )
    else:
        ray.kill(
            actors_duplicated_lst[i]
        )

# List workers:
for i in range(len(actors_lst)):
    print(
        i,
        ray.get(
            actors_lst[i].ip.remote()
        ),
        ray.get(
            actors_lst[i].free_store.remote(
                path = '/home'
            )
        ),        
    )

# Simulate data for testing:
data = np.random.normal(size=(10,10))

# Example of writting, reading, listing files, and removing folder:
for WORKER in [0, 1]:
    # Save data in the local path of the worker:
    ray.get(
        actors_lst[WORKER].save_numpy_array.remote(
            data = data,
            file_path = f'/home/test_worker/test_worker_{WORKER}.npy',
            dtype = np.float32,
        )
    )
    # Load data in the local path of the worker:
    tmp = ray.get(
        actors_lst[WORKER].load_numpy_array.remote(
            file_path = f'/home/test_worker/test_worker_{WORKER}.npy',
        )
    )
    ip = ray.get(
        actors_lst[WORKER].ip.remote()
    )
    print(
        ip, tmp.shape,
    )    
    # List files in a given worker path::
    ray.get(
        actors_lst[WORKER].ls.remote(
            path = '/home/test_worker',
        )
    )
    # Remove recursevely folder in a given worker path::
    ray.get(
        actors_lst[WORKER].rm_rf.remote(
            path = '/home/test_worker',
        )
    )
    # List files in a given worker path::
    ray.get(
        actors_lst[WORKER].ls.remote(
            path = '/home/test_worker',
        )
    )

thanks for your help. will give it a go.

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.