Hi there!
I am currently implementing my own project with PyTorch and Ray. I wonder how can I can distribute my model to different ray nodes. I want it to work in a similar way as torch DistributedDataParallel, which launches multiple processes and distributes the work load in a single program multiple data fashion over physical hardware, for example one cpu and one gpu per process.
My setting is: say I have a ray actor, each of them can produce their own training data via simulation[already achieved]
@ray.remote(num_cpus=1, num_gpus=0.5)
class MyActor:
...
I want to instantiate 10 much actors to do simulation and distribute my model to train on them in parallel. After each actor trains, say 5 epochs, I synchronize their gradients and re-distribute the synced model. This workflow proceeds.
I know there is a ray train library out there that might help to achieve this goal. But I want to achieve this with pure Ray Core APIs somehow (mainly because I do not want to refactor my codes that much). Is there anyway to do this? Any suggestion is welcomed! Thanks in advance
Yes, you can achieve this with Ray’s Actor API. You can define your model as an actor and then instantiate multiple actors to train your model in parallel. Here is a simplified example:
import ray
@ray.remote
class ModelActor:
def init(self, model):
self.model = model
def train(self, data):
Your training logic here
…
return gradients
Initialize Ray
ray.init()
Create your model
model = …
Create actors
actors = [ModelActor.remote(model) for _ in range(10)]
Train model on each actor
for _ in range(5): # 5 epochs
futures = [actor.train.remote(data) for actor in actors]
gradients = ray.get(futures)
Synchronize gradients and update model
…
In this example, ModelActor is an actor class that wraps your model. Each actor runs on a separate process and can utilize one CPU and one GPU as specified in the ray.remote decorator. The train method is where you put your training logic. After each epoch, you collect the gradients from each actor, synchronize them, and update your model.
Please note that this is a simplified example and you might need to adjust it according to your specific needs. For example, you might need to handle the distribution and collection of training data, and the synchronization and distribution of the updated model.
If you find it challenging to implement this with Ray Core APIs, you might want to consider using Ray Train, which provides a simple API for distributed training and handles many of the complexities for you.
Thank you @zhz and @Hasnain_Fareed!
I am trying to implement the approach @Hasnain_Fareed suggested. Just want to make things a little bit clearer:
I notice that there is a master node that periodically collects gradients, aggregates and re-broadcasts new models to training nodes. Will those gradients reported by training node go through object tables stored in RAM or they can take advantage of GPU’s NVLink directly for interprocess communication?
Also, can I handle the dependency directly on the training node without introducing a master parameter server, do something like
# store futures from all other training nodes
actor_0:
ray.get([actor_0_future, actor_1_future, actor_2_future])
actor_1:
ray.get([actor_0_future, actor_1_future, actor_2_future])
actor_2:
ray.get([actor_0_future, actor_1_future, actor_2_future])
specifically, actor’s future here refers to its local gradient. I want ray to help me schedule the communication between actors. Is this a valid approach?