Hyperparameter optimization on Slurm using DistributedDataParallel and mpi4py

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi all, I’m new to Ray, and i would like to use RayTune to find the best parameters for my neural network.

Things that work well before adding RayTune

I made a python script nn.py featuring a train_and_test(args) function, which accepts parameters like {"par1":45, "par2":64, ...} and returns the scores like this {"metric0":0.65468,"metric2":35.1345,"metric2":0.6125}. Let’s call this part my inner loop, which just trains one model. I’m using Slurm scheduler and DistributedDataParallel to run it over more GPUs and nodes at once. This part works, when I’m submitting my batch script with following details:
sbatch.sh

#SBATCH --nodes 2
#SBATCH --ntasks-per-node 8
#SBATCH --gpus-per-node 8
#SBATCH --cpus-per-task 7
[...]
srun singularity exec [... bindings] $CONTAINER run-me.sh

run_me.sh

[...]
export MASTER_ADDR=$(python /workdir/get-master.py "\$SLURM_NODELIST")
export MASTER_PORT=29500
export WORLD_SIZE=$SLURM_NPROCS
export RANK=$SLURM_PROCID
export LOCAL_RANK=$SLURM_LOCALID 
python -u nn.py --hidden_dim 3 [... args]

nn.py

[...]
def train_and_test(args):
    [...]
    return {"a":score_a, "b":score_b, "c":score_c}

if __name__ == "__main__":
    args = get_args()
    train_and_test(args)

Things that I imagine could work well with RayTune

Now, I would like to implement an outer loop to find the best parameters. Notice how I call 8 tasks per node for the inner loop to just call my script with python? It works well, when I define the right rank in DistributedDataParallel (for that I’m using the $SLURM_PROCID variable ) .

So my idea was something like this: Let’s do it just the same way, except in one of the processes I prepare the arguments using RayTune, distribute them using MPI4PY and everything will be nice and I’ll live happily ever after.

I tried to implement it the folowing way, but I’m still getting errors.

sbatch.sh same as before
run_me.sh - same as before (except I dont need to specify so many args anymore)
nn_raytune.py - new nn.py, except with raytune

[...]

def train_and_test(args):
    [...]
    return {"a":score_a, "b":score_b, "c":score_c}

def trainable(config):
    args = [...] #parse config into args, typically file paths and --reset option
    result = train_and_test(args)
    train.report(result)

if __name__ == "__main__":    
    args = get_args()
    # new part here:
    comm = MPI.COMM_WORLD 
    rank = comm.rank

    if rank == 0:
        config = {"hidden_dim": tune.randing(2,4), 
                   # [...]
                 }
        tuner = tune.tuner(trainable, param_space=config, tune_config=tune.TuneConfig(num_samples=-1,  time_budget_s=600))
    else:
        tuner = None
    tuner = comm.bcast(tuner, root=0)
    analysis = tuner.fit()

What now?

Is this doable? I’m not sure if now I’m just dealing with only a technical issues (wrong version of some tool), or if the idea I’m thinking is doable at all.
I’m trying some different versions of RayTune (pip-installed), because that’s what people on the internet often do when they encounter this type of error:

Traceback (most recent call last):
  File "/workdir/scripts/nn_raytune.py", line 337, in <module>
    analysis = tuner.fit()
[...]
  File "/users/username/.local/lib/python3.10/site-packages/ray/tune/impl/tuner_internal.py", line 432, in converted_trainable
    return self._converted_trainable
AttributeError: 'TunerInternal' object has no attribute '_converted_trainable'. Did you mean: 'converted_trainable'?

So, might it be the version-thingy, or am I missing something?

Note:

I was able to run the ray-tune version on 8 GPUS on 1 node usingtorchnrun instead of python (using --ntasks-per-node 1 and then handling ranks inside the script).

But that worked only for 1 node at a time (running 1 process per node), and one trial per GPU (I was not able to run it only with num_cpus=7, num_gpus=1)

The use-case I would to use it is to train one trial at a time (or two, four… if possible), but scaled over all the possible resources (might be even hundred of nodes).

Update:

Started using ray tool (out of python code). So now I have in my script run-me.sh

export MASTER_PORT=29500
export MASTER_ADDR=$(python /workdir/get-master.py "$SLURM_NODELIST")
export RAY_PORT=29600 
export RAY_ADDRESS=$MASTER_ADDR:$RAY_PORT
#[...]
if [ $(python /workdir/get-master.py "$SLURM_NODELIST") == "$MASTER_ADDR" ]; then
    echo "Starting Ray on the head node..."
    ray start --head --port=$RAY_PORT
else
    # Start Ray on worker nodes and connect them to the head node
    echo "Starting Ray on a worker node and connecting to the head node..."
    ray start --address=$RAY_ADDRESS
fi

and it keeps me ending up with

Trial trainable_e69da_00001 errored after 0 iterations at 2024-12-10 16:13:28. Total running time: 10s
Error file: /tmp/ray/session_2024-12-10_16-12-35_635990_53405/artifacts/2024-12-10_16-13-16/trainable_2024-12-10_16-13-16/driver_artifacts/trainable_e69da_00001_1_batch_size=31,epochs=20,gauss_multipliers=1_5,hidden_dim=63,learning_rate=0.0001,nfourierfeatures=20,num_l_2024-12-10_16-13-17/error.txt
2024-12-10 16:13:28,182	ERROR tune_controller.py:1331 -- Trial task failed for trial trainable_e69da_00006
Traceback (most recent call last):
  File "/users/username/.local/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
...
ray.exceptions.RayTaskError(DistNetworkError): e[36mray::ImplicitFunc.train()e[39m (pid=62864, ip=10.253.16.116, actor_id=26a713817a13b023e52e00c702000000, repr=trainable)
 
...
    return TCPStore(
torch.distributed.DistNetworkError: The server socket has failed to listen on any local network address. The server socket has failed to bind to [::]:29500 (errno: 98 - Address already in use). The server socket has failed to bind to 0.0.0.0:29500 (errno: 98 - Address already in use).

Things I have tried:

# added some env vars for PyTorch nccl backend
export NCCL_SOCKET_IFNAME=^lo
export NCCL_DEBUG=INFO
export NCCL_P2P_DISABLE=1
# tried to set the port as reusable
import socket

def create_socket(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(('', port))
    #print(f"Socket bound to port {port} with SO_REUSEADDR set.")
    s.listen(5)
#run the script using with -m torch.distributed.launch and setting rdzv-endpoint, trying both MASTER_PORT and RAY_PORT
python  -m torch.distributed.launch \
    --rdzv-endpoint=\$MASTER_ADDR:$MASTER_PORT \ 
    nn.py --hidden_dim 3 [... args]