ExternalMultiAgentEnv and QMIX for remote inference over HTTP with multiple clients

hi, i’m trying to combine these aspects of rllib in order to train QMIX on a server which receives on-policy remote updates from n independent clients, each running MATSim, a traffic simulation written in java. i have written a wrapper around MATSim so that it can be paused in order to query the server for an action. since a MATSim run is expensive and my hardware is dated, remote inference and running parallel clients is the only way i expect to get results fast enough to iterate in my research.

starting with the simplest approach, we have one group for 4 agents:

grouping = {
  "group_1": [ "agent_1", "agent_2", "agent_3", "agent_4"]
}

and one policy that all agents share:

multiagent_config = {
  "policies": {
    "main": (None, my_obs_space, my_act_space, {})
  },
  "policy_mapping_fn": lambda agent_id: "main"
}

in order to set these, i pass them along with the Spaces to the trainer via the config, as in the cart-pole example:

config = {
        # Indicate that the Trainer we setup here doesn't need an actual env.
        # Allow spaces to be determined by user (see below).
        "env": None,

        # TODO: (sven) make these settings unnecessary and get the information
        #  about the env spaces from the client.
        "observation_space": my_obs_space,
        "action_space": my_act_space,
        "multiagent": multiagent_config,

        # Use the `PolicyServerInput` to generate experiences.
        "input": _input,
        # Use n worker processes to listen on different ports.
        "num_workers": args.num_workers,
        # Disable OPE, since the rollouts are coming from online clients.
        "input_evaluation": [],
        # Create a "chatty" client/server or not.
        "callbacks": MyCallbacks if args.chatty_callbacks else None,
    }

trainer = QMixTrainer(
            config=dict(config, **{
                # "num_envs_per_worker": 5,  # test with vectorization on
                # "env_config": {
                #     "avail_action": 3,
                # },
                "framework": args.framework, # only "torch" allowed here
            })
        )

big questions:

  • is this even possible?
  • if so, where does the “grouping” go in this case?, (since i have no “Environment”)

thank you in advance,

rob

I think external environment and server/client setup should work well for this use case.

what do you actually mean by “where does the grouping go”?
the policy_mapping_fn controls which policy each agent will query for actions.
if they are all mapped to the “main” policy, then all the agents should be in the same group.

thank you for the quick reply on this.

what do you actually mean by “where does the grouping go”?

just following the examples, a “grouping” argument is passed to a MultiAgentEnv using MultiAgentEnv(conf).with_agent_groups(grouping, obs_space, act_space), which i’m guessing somehow informs the QMIX trainer on how to combine the cost functions to train the collaborative policy. i don’t think this “grouping” information is the same as the “policy” information. telling 4 agents to point at policy 1 is not the same as telling 4 agents with policy 1 to combine their costs in training. am i understanding this correctly?

now, in my case, there is no “environment” (aka., "environment": None), since the environment is remote. so, i’m not sure how to provide the grouping information.

@robfitzgerald

You will need to add the with_agent_groups bit to your multiagent_external_env in the policy client. It will take care of grouping the observations in the way that qmix needs them.

On the policy_server side you give it what the observation and action spaces will look like after agents are grouped.

Take a look at this example: ray/two_step_game.py at master · ray-project/ray · GitHub


oh ok, the server doesn’t have to know about the groupings! thank you for clarifying @mannyv. since i am emulating PolicyClient in Scala, i will review the code in GroupAgentsWrapper and re-create it.

the problem

I have been testing my end-to-end setup and I am failing with a timeout error on the server-side _send operation when sending LOG_ACTION or GET_ACTION after a successful START_EPISODE request is handled.

i’m concerned my request is malformed but i’m finding it difficult to track down documentation or an example that clarifies what i may be missing. i’m also concerned a worker is failing silently, that i’m doing weird things with an experimental API and this is all smelly weird user error business.

my setup

using ray 1.7.0. running test server from PyCharm. attempting to run QMIX with a multiagent setup and an external environment. rllib script based on cartpole server with “env”=None, PolicyServerInput modified to use json instead of cloudpickle (in order to send/receive from Scala), and plugging in QMixTrainer. i also extended the timeout at external_env.log_action to 10 minutes. I’m sending a test START_EPISODE request followed by a GET_ACTION or LOG_ACTION request with dummy data. server and client running on localhost on my laptop.

my cascading failure to execute a sound implementation strategy

At first I thought maybe this was because of having limited compute resources. I’m using a 2015 quad-core i7 laptop, no GPUs. But I was able to run rllib.agents.qmix.test.test_qmix.py without any issues. I’ve been testing with what I believe to be a fairly small observation + action space, so I don’t believe my issue is because of resources.

Then I thought it was maybe because the server needed my grouping information, but, that turns out to only be a concern for the client.

After that, I switched from calling GET_ACTION to LOG_ACTION, wondering if the issue was the lack of Env on the server, and if these on-policy updates required the client to sample the action space. But I get the same timeout from GET_ACTION as LOG_ACTION.

implementation

Below I’ll describe the Env, server and client setup, then I’ll show my server log at the end. I would appreciate any guidance! Thank you RLLIB community :slight_smile:

Env

# obs: (src_x, src_y, dst_x, dst_y, congestion_%)
# each observation is for a driver agent in a traffic simulation
# there are two groups of 2 such driver agents
obs_space = Tuple(Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64)), Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64)))
# act: (route_selection)
# each driver can select one of up to 10 route plans
# Q: is the action space supposed to be "flattened" like this wrt groupings? 
#     QMIX borks at me when i try to do Tuple[Tuple[Discrete...]
act_space = Tuple(Discrete(10), Discrete(10), Discrete(10), Discrete(10))

Server

# based on https://github.com/ray-project/ray/blob/master/rllib/examples/serving/cartpole_server.py
# PolicyServer modified to remove pickling (use json.{loads|dumps} instead of pickle.{loads|dumps})
# in order to interact with non-python client

def _input(ioctx):
if ioctx.worker_index > 0 or ioctx.worker.num_workers == 0:
    return PolicyServerNoPickleInput(
        ioctx,
        SERVER_ADDRESS,
        SERVER_BASE_PORT + ioctx.worker_index - (1 if ioctx.worker_index > 0 else 0)
    )
else:
    return None

# trivial multiagent configuration
multiagent = {
    "policies": {
        "main": (None, obs_space, act_space, {})
    },
    "policy_mapping_fn": lambda agent_id: "main"
}

config = {
    "env": None,
    "observation_space": obs_space,
    "action_space": act_space,
    "multiagent": multiagent,
    "input": _input,
    "num_workers": args.num_workers,
    "input_evaluation": [],
}

trainer = QMixTrainer(
    config=dict(config, **{
        "mixer": "qmix",
        "framework": "torch"
    })
)

Request

{
  'episode_id': 'floof', 
  'observation': {'group_1': [[0.0, 0.0, 10.0, 10.0, 0.1], [-10.0, -10.0, 0.0, 0.0, 0.7]], 'group_2': [[10.0, 10.0, 0.0, 0.0, 0.02], [0.0, 0.0, -10.0, -10.0, 0.99]]}, 
  'action': {'group_1': [0, 7], 'group_2': [8, 4]}, 
  'command': 'LOG_ACTION'
}

Logs

/Users/<user>/opt/anaconda3/envs/so-rl-server/bin/python /Users/<user>/dev/ucd/phd/projects/2019su/SOTestbed/python/rllib_server/rllib_server.py --run QMIX --num-workers 1 --k 10 --grouping-file test/assets/grouping_test.json
2021-10-15 11:39:09,762 INFO services.py:1250 -- View the Ray dashboard at http://127.0.0.1:8265
ray version 1.7.0
2021-10-15 11:39:10,596 INFO trainer.py:758 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
2021-10-15 11:39:16,134 WARNING trainer_template.py:185 -- `execution_plan` functions should accept `trainer`, `workers`, and `config` as args!
2021-10-15 11:39:16,134 WARNING util.py:57 -- Install gputil for GPU system monitoring.
beginning training loop with policies, observation space, action space:
{'policies': {'main': (None, Tuple(Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64)), Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64))), Tuple(Discrete(10), Discrete(10), Discrete(10), Discrete(10)), {})}, 'policy_mapping_fn': <function create_so_routing_multiagent_config.<locals>.<lambda> at 0x1787da790>}
Tuple(Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64)), Tuple(Box(-inf, inf, (5,), float64), Box(-inf, inf, (5,), float64)))
Tuple(Discrete(10), Discrete(10), Discrete(10), Discrete(10))
(pid=19091) /Users/<user>/opt/anaconda3/envs/so-rl-server/lib/python3.9/site-packages/ray/rllib/agents/qmix/qmix_policy.py:453: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  ../torch/csrc/utils/tensor_numpy.cpp:180.)
(pid=19091)   k: torch.as_tensor(v, device=self.device)
(pid=19091) 127.0.0.1 - - [15/Oct/2021 11:39:36] "POST / HTTP/1.1" 200 -
(RolloutWorker pid=19091) received payload:
(RolloutWorker pid=19091) {'episode_id': 'floof', 'training_enabled': True, 'command': 'START_EPISODE'}
(pid=19091) 2021-10-15 11:39:36,088     WARNING deprecation.py:38 -- DeprecationWarning: `policy_mapping_fn(agent_id)` has been deprecated. Use `policy_mapping_fn(agent_id, episode, worker, **kwargs)` instead. This will raise an error in the future!
(RolloutWorker pid=19091) received payload:
(RolloutWorker pid=19091) {'episode_id': 'floof', 'observation': {'group_1': [[0.0, 0.0, 10.0, 10.0, 0.1], [-10.0, -10.0, 0.0, 0.0, 0.7]], 'group_2': [[10.0, 10.0, 0.0, 0.0, 0.02], [0.0, 0.0, -10.0, -10.0, 0.99]]}, 'action': {'group_1': [0, 7], 'group_2': [8, 4]}, 'command': 'LOG_ACTION'}
(pid=19091) 127.0.0.1 - - [15/Oct/2021 11:44:36] code 500, message Traceback (most recent call last):
(pid=19091)   File "/Users/<user>/dev/ucd/phd/projects/2019su/SOTestbed/python/rllib/env/policy_server_no_pickle.py", line 173, in do_POST
(pid=19091)     response = self.execute_command(parsed_input)
(pid=19091)   File "/Users/<user>/dev/ucd/phd/projects/2019su/SOTestbed/python/rllib/env/policy_server_no_pickle.py", line 211, in execute_command
(pid=19091)     child_rollout_worker.env.log_action(
(pid=19091)   File "/Users/<user>/opt/anaconda3/envs/so-rl-server/lib/python3.9/site-packages/ray/rllib/env/external_multi_agent_env.py", line 114, in log_action
(pid=19091)     episode.log_action(observation_dict, action_dict)
(pid=19091)   File "/Users/<user>/opt/anaconda3/envs/so-rl-server/lib/python3.9/site-packages/ray/rllib/env/external_env.py", line 233, in log_action
(pid=19091)     self.action_queue.get(True, timeout=300.0)
(pid=19091)   File "/Users/<user>/opt/anaconda3/envs/so-rl-server/lib/python3.9/queue.py", line 179, in get
(pid=19091)     raise Empty
(pid=19091) _queue.Empty
(pid=19091) 
(pid=19091) 127.0.0.1 - - [15/Oct/2021 11:44:36] "POST / HTTP/1.1" 500 -

OK! i continued digging around. as pointed out here, the queue.Empty error surfaces when the observation space is not correct.

in particular, my observation space was a Tuple([Tuple([Box, Box]), Tuple([Box, Box])]), which covers both groups, but a multiagent observation’s Space should only describe the righthand side of each multiagent observation in the multiagent observation dictionary. in my case, this should have been Tuple[Box, Box].

same problem with my action space, it should have been Tuple([Discrete, Discrete]).

this made me confused as to how we can have heterogeneous agent groupings, but, i’m guessing that’s where the multiagent mapping section of the trainer config comes in to play.