How do Tasks get scheduled on a Head Node with CPU=0?

Hello! I am new here and currently evaluating Ray as a possible replacement to our current batch scheduling solution. I want to make sure I understand how Ray schedules.

How do tasks get scheduled onto worker nodes when the head node has CPU=0? Assuming Ray Clients always connect to the head node by ray.init(<head node address>).

From the Ray 1.0 Architecture Whitepaper…

### Scheduling policy

A raylet always attempts to grant a resource request using local resources first. When there are no local resources available, there are three other possibilities:

1. Another node has enough resources, according to the possibly stale information published by the GCS. The raylet will spillback the request to the other raylet.
2. No node currently has enough resources. The task is queued locally until resources on the local or remote node become available again.

So does this mean, all tasks go through the head node’s raylet, and that raylet will always try to find another node with enough resources and return that to the client so the client can retry their request on that worker node? (assuming there is a worker with enough resources).

Hi @Victor_Yap, thank you for checking out Ray! :grinning_face_with_smiling_eyes:

So does this mean, all tasks go through the head node’s raylet, and that raylet will always try to find another node with enough resources and return that to the client so the client can retry their request on that worker node?

That is correct! There is one exception that isn’t mentioned in the Ray architecture white paper: locality-aware scheduling, where instead of always attempting to schedule the task locally, Ray will try to schedule the task onto the node that has the most of the task’s dependencies already local. This can be thought of as “try to first execute the task where it’s dependencies already live since this will result in less data transfer”.

Explicitly, if a task g is submitted on the head node and has a task argument arr that is stored in node A's object store, and arr is not in the local head node’s object store, the task will be sent to node A's raylet instead of the local head node’s raylet.

For example:

import numpy as np

@ray.remote
def foo():
    return np.random.rand(10000)

@ray.remote
def bar(arr):
    return np.mean(arr)

# f is first submitted to the head node's raylet, but is eventually executed on node A,
# not the head node, since the head node has CPU=0. f's output will therefore be
# stored on node A.
f = foo.remote()
# g is first submitted to node A's raylet and is executed on node A, since its only task
# dependency (the output of f) is stored on node A.
g = bar.remote(f)

print(ray.get(g))