Trainers with policy client/server(s) on a single machine locks at .train(), is multi-thread explicitly needed?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I setup a policy client/server training architecture for a multi agent environment running on an external simulator . N different client/servers are instantiated for each agent, and all of them are on the same machine (for now). After the instantiation of the first server the system deadlocks on the .train() method at the end of its instantiation, outputting just “{}”, while by removing the call the code works fine.

    def policy_Server(self):
        cppu_number = self.cppu_name[-1:]
        self.LOCAL_PORT = self.SERVER_BASE_PORT + int(cppu_number)*10

        def _input(io_ctx):
            # Create a PolicyServerInput.
            if self.num_workers == 0:
                self.local_address = self.LOCAL_PORT
                self.server = PolicyServerInput(io_ctx, self.SERVER_ADDRESS, self.LOCAL_PORT)
                self.logger.info(f'Server for {self.cppu_name} @ {self.LOCAL_PORT} initialized')
                return self.server
            # No InputReader (PolicyServerInput) needed.
            else:
                self.server = None
                self.logger.info(f'Server for {self.cppu_name} not initialized')
                return self.server
        
        config_file  = {
            # Indicate that the Trainer we setup here doesn't need an actual env.
            # Allow spaces to be determined by user (see below).
            "env": None,

            # retrive infos of the env
            "observation_space": self.observation_space,
            "action_space": self.action_space,

            # Use the `PolicyServerInput` to generate experiences.
            "input": (lambda io_ctx : _input(io_ctx)),

            "input_evaluation": [],
            
            # Use n worker processes to listen on different ports.
            "num_workers": self.num_workers, 
            # Disable OPE, since the rollouts are coming from online clients.
            # "off_policy_estimation_methods": {}, 
            # Set to INFO so we'll see the server's actual address:port.
            "log_level": "INFO",
            # Other settings for trainer
            "train_batch_size": 256,
            "rollout_fragment_length": 20,
            "framework": "tf",
        }

        self.algorithm = get_trainer_class("PPO")(config=config_file)
def policy_Client(self):
        try: 
            self.client = PolicyClient("http://localhost:" + str(self.LOCAL_PORT), inference_mode=self.inference_mode)
            self.logger.info(f'Client for {self.cppu_name} connected @ {str(self.LOCAL_PORT)}')
        except:
            self.logger.info(f'Client for {self.cppu_name} connection failed @ {str(self.LOCAL_PORT)}')
class Agent():
        def __init__(self):
                self.policy_Server()
                self.Policy_Client()
                while True:
                        # Perform one iteration of training the policy with PPO
                        result = self.algorithm.train()
                        self.logger.info(f'Training enabled')

I was wondering whether the issue stays in the lack of connection/communication or it is due to the fact that I should explicitly create a thread for each client and server before instantiating them. Looking at the cartPole and Unity examples, I noticed that clients and servers are usually lunched on two different shell instances, while on my project they are not.

I believe you need to launch the server and the client as different processes. I use client-server almost everyday now, and I’ve used it these three ways:

  1. Open up two terminals. Launch the client in one and the server in the other. I typically do this when debugging or when I need interactivity.
    a. I often have multiple clients, so I have a bash script that I run for launching all the clients from a single terminal.
  2. Launch a python script that creates a subprocess for the client. I don’t do this anymore, but it does work.
  3. Create a bash script that launches the server and the client as different processes. I typically do this when I am running at full scale.

I can provide examples of all three if you’re interested. But basically, once you can do the first one, its just a short step to do the other two.

Thank you for your reply.
Could you please provide me the examples of this three option you are mentioning?
Do you also think that a multithreading solution could work too? Have you tried in this way?

Sorry for the delay, I was on vacation. Here’s an example using a bash script with slurm to launch the server on one compute node and multiple clients on other compute nodes. I put this together by combining the cartpole client server example with the ray slurm example.

#!/bin/bash
#SBATCH --job-name=rllib-cartpole-test
#SBATCH --nodes=2
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --time=00:30:00
#SBATCH --exclusive
#SBATCH --no-kill
#SBATCH --output="slurm-%j.out"

# Run with sbatch client_server.sh
source python/virtual/environment/for/this/run/activate

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

# if we detect a space character in the head node IP, we'll
# convert it to an ipv4 address. This step is optional.
if [[ "$head_node_ip" == *" "* ]]; then
IFS=' ' read -ra ADDR <<<"$head_node_ip"
if [[ ${#ADDR[0]} -gt 16 ]]; then
  head_node_ip=${ADDR[1]}
else
  head_node_ip=${ADDR[0]}
fi
echo "IPV6 address detected. We split the IPV4 address as $head_node_ip"
fi

port=9990
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"

echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" --output="slurm-%j-HEAD.out" \
  python3 -u ./corridor_server.py --env CppCorridor --ip-head $ip_head &

# optional, though may be useful
sleep 180

# number of nodes other than the head node
echo "SLURM JOB NUM NODES " $SLURM_JOB_NUM_NODES
worker_num=$((SLURM_JOB_NUM_NODES - 1))

for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    echo "Starting WORKER $i at $node_i"
    srun --nodes=1 --ntasks=1 -w "$node_i" --output="slurm-%j-$node_i.out" \
      python3 -u ./corridor_client.py --env CppCorridor --ip-head $ip_head &
    sleep 5
done

wait