I have created a cluster with one head node and 2 worker nodes,
I followed the training example tensorflow_mnist_example for distributed training, but what confuses me is that only The head node can run the training process.
I printed the TF_CONFIG env and only the head node ip address can be seen.
My tensorflow training task is only executed on the head node, and there is no training task scheduling on the worker node.
cluster info
head node: 10.68.134.13
work1 node: 10.88.112.16
work2 node: 10.88.112.17
The log information is as follows
Connecting to existing Ray cluster at address: 10.68.134.13:6379...
Connected to Ray cluster. View the dashboard at 10.68.134.13:8265
2024-01-19 02:46:43.053895: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> 10.68.134.13:53431, 1 -> 10.68.134.13:56466, 2 -> 10.68.134.13:49974, 3 -> 10.68.134.13:58255} [repeated 3x across cluster]
and I print the tf cluster TF_CONFIG env, this config is not set by myself
Hi @yydai, it looks like multiple workers are being scheduled, but the problem is that all of them are on the head node.
What does your cluster setup look like? (number of nodes, number of GPUs on each node)
What does your ScalingConfig look like?
If you have multiple GPUs on the head node, then Ray might schedule all of the workers on the head node (assigning 1 GPU per worker unless you specify more).
In fact, it was indeed scheduled to one node when number_workers is small(ray demo default set 2). When I increased the number of workers to 40, it started to be scheduled to other worker nodes. And my ScalingConfig is
For this reason, I took a look at the ray code and found that we can observe how to do machine scheduling and grouping through the following code
import ray
from ray.train._internal.worker_group import WorkerGroup
ray.init('auto')
wg = WorkerGroup(num_workers=50)
print( [w.metadata.hostname for w in wg.workers])
And another my question about TF_CONFIG also found the setting place
def _setup_tensorflow_environment(worker_addresses: List[str], index: int):
"""Set up distributed Tensorflow training information.
This function should be called on each worker.
Args:
worker_addresses: Addresses of all the workers.
index: Index (i.e. world rank) of the current worker.
"""
tf_config = {
"cluster": {"worker": worker_addresses},
"task": {"type": "worker", "index": index},
}
os.environ["TF_CONFIG"] = json.dumps(tf_config)
When using the above configuration, the program seems to be stuck and very slow. Is this normal? And, I changed it to SPREAD and the execution was completed quickly.
@yydai The strict spread strategy isn’t feasible in this case, since it’d require you to have 50 nodes, where each worker gets placed onto a separate node. SPREAD is a softer strategy – however, you will mostly want to keep the default PACK strategy, since collective calls are faster if you have fewer inter-node connections.