I have a pre-processing step in the pipeline in which a worker downloads the data from cloud storage, then loads it from disk and processes in chunks. I would like to parallelize the load + process step, however, when running in a cluster setting, the jobs created with @ray.remote get submitted to the other nodes of the cluster that do not have the necessary data to read from disk resulting in FileNotFound errors.
I wonder if it is possible to somehow limit the jobs that are created by the workers to only be accepted by other workers on the same node? Thanks in advance!
import ray
import socket
ray.init()
pg = ray.util.placement_group([{"CPU": 2}])
@ray.remote
def f1():
print(socket.gethostbyname(socket.gethostname())) # print IP address
@ray.remote
def f2():
print(socket.gethostbyname(socket.gethostname())) # print IP address
# These two tasks will be scheduled in the placement group, which means
# they'll be on the same machine.
f1.options(placement_group=pg).remote()
f2.options(placement_group=pg).remote()
The above creates a placement group with 2 CPU resources. The placement group will be scheduled on some machine (kind of like an actor) that has sufficient resources. As long as the placement group is alive, it will “occupy” those resources and prevent them from being used by other tasks/actors.
You can schedule tasks/actors in that placement group and they will all be colocated.
Note that you can do much more with placement groups. For example, you can create multiple “bundles” that get spread across multiple machines or get packed together.
That is a great case of using custom resources!
My solution ended up to be exactly that: I specified the resources on the head node of the cluster in the YAML config
and then restricted the scope of the actors that can access those resources by
decorating them with @ray.remote(resources={"file":1}). This ensured that the actors can only be scheduled on the head node which is exactly what I needed.
Thanks for the pointer about placement groups! I knew about them from the docs, but never realized how they can be used in practice. In fact, I think that this solution is better suited to my original question than the one with custom resources by @sangcho. However, since I have changed the processing pipeline to be more centralized, where all of the raw data lives on the head node and only the model training part is done by the workes, using custom resources turned out to be easier in the end. Either way, thank you for your response and I will be looking forward to using placement groups feature next time!
There are a number of things I do not understand about this solution:
whenever a placement group needs more than 1 CPU, like in your example, why could it not use two nodes with one CPU each to fulfill that requirement? I thought to actually make sure that the placement is on a single node, the placement strategy STRICT_PACK would have to get specified?
how would it be possible to ensure that the node for that placement group is the SAME as the one starting the actor, i.e. the one running the actor.remote() call? Or more generally, is it possible to place actors on a SPECIFIC node (where specific means the same node as some actor or function for which i have the remote handle)?
Sadly the documentation does not mention this anywhere and the API documentation on the web site is incomplete, so the (probably) relevant functions and documentation blurbs are not in the API documentation either.