When I use f=open(“path_to_file”, “r”) within the function decorated by “@ray.remote”, it opens a the file on the head machine. Is there a way where I can make it point to locations on the actors?
In my case working in the kubernetes environment, where each worker node has its own /home
storage, I solved the problem by the following code (example for saving/loading numpy arrays, just extend for the file type you want read):
# Load packages
import ray
import os
import numpy as np
# Constants:
MAX_TIME_OUT = 1000
# Init ray cluster:
if os.path.isdir('/home/ray'):
ray.init(
address='auto',
ignore_reinit_error=True,
)
else:
ray.init(
num_cpus = os.cpu_count()-1,
ignore_reinit_error=True,
)
## Get info about the node:
class NodeInfo(object):
def __init__(self):
import time
time.sleep(0.01)
def create_path(self, path):
"""
Create a path in the file system
"""
import pathlib
p = pathlib.Path(path)
p.mkdir(parents=True, exist_ok=True)
def ip(self):
"""
Get the IP address
"""
import socket
return socket.gethostbyname(
socket.gethostname()
)
def free_store(self, path):
"""
Check the free storage in a given home path
"""
import numpy as np
import os
statvfs = os.statvfs(path)
return np.round(
statvfs.f_frsize * statvfs.f_bavail / 1024**3,
2,
)
def set_user_permission(self):
"""
Transform the /home into user permission
"""
import subprocess
subprocess.call(['sudo', 'chmod', '+x', '/home'])
subprocess.call(['sudo', 'chmod', 'ugo+rwx', '/home'])
def save_numpy_array(self, data, file_path, dtype = np.float32):
"""
Write in disk a numpy array
"""
self.set_user_permission()
self.create_path(
os.path.dirname(
file_path
)
)
with open(file_path, 'wb') as f:
np.savez_compressed(
f,
data=data.astype(dtype),
)
def load_numpy_array(self, file_path):
"""
Load numpy array from a worker node
"""
with open(file_path, 'rb') as f:
data = np.load(f)['data']
return data
def ls(self, path):
"""
List files in a given path
Parameters
----------
path (str):
Complete path to glob
"""
import os
import glob
return glob.glob(
os.path.join(
path,
'*'
)
)
def rm_rf(self, path):
"""
Remove all folders and files under path, and path as well
"""
import shutil
import pathlib
import os
path = pathlib.Path(path)
try:
shutil.rmtree(path)
except:
pass
# Stack duplicate actors:
actors_duplicated_lst = []
ip_duplicated_lst = []
for i in range(MAX_TIME_OUT):
actors_duplicated_lst.append(
ray.remote(NodeInfo).remote()
)
ip_duplicated_lst.append(
actors_duplicated_lst[-1].ip.remote()
)
# Kill duplicated actors:
ip_lst = []
actors_lst = []
ip_duplicated_lst = ray.get(ip_duplicated_lst)
for i, ip in enumerate(ip_duplicated_lst):
if ip not in ip_lst:
actors_lst.append(
actors_duplicated_lst[i]
)
ip_lst.append(
ip
)
else:
ray.kill(
actors_duplicated_lst[i]
)
# List workers:
for i in range(len(actors_lst)):
print(
i,
ray.get(
actors_lst[i].ip.remote()
),
ray.get(
actors_lst[i].free_store.remote(
path = '/home'
)
),
)
# Simulate data for testing:
data = np.random.normal(size=(10,10))
# Example of writting, reading, listing files, and removing folder:
for WORKER in [0, 1]:
# Save data in the local path of the worker:
ray.get(
actors_lst[WORKER].save_numpy_array.remote(
data = data,
file_path = f'/home/test_worker/test_worker_{WORKER}.npy',
dtype = np.float32,
)
)
# Load data in the local path of the worker:
tmp = ray.get(
actors_lst[WORKER].load_numpy_array.remote(
file_path = f'/home/test_worker/test_worker_{WORKER}.npy',
)
)
ip = ray.get(
actors_lst[WORKER].ip.remote()
)
print(
ip, tmp.shape,
)
# List files in a given worker path::
ray.get(
actors_lst[WORKER].ls.remote(
path = '/home/test_worker',
)
)
# Remove recursevely folder in a given worker path::
ray.get(
actors_lst[WORKER].rm_rf.remote(
path = '/home/test_worker',
)
)
# List files in a given worker path::
ray.get(
actors_lst[WORKER].ls.remote(
path = '/home/test_worker',
)
)
thanks for your help. will give it a go.
This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.