Loading offline tabular data leads to observations of type string

1. Severity of the issue:
High: Completely blocks me.

2. Environment:

  • Ray version: 2.44.1
  • Python version: 3.12.9
  • OS: Linux

3. What happened vs. what you expected:
Saved tabular offline data generated by a pretrained PPO agent in .parquet files.

  • Expected that loading the data from parquet would give me something like this
# Column              Type
# ------              ----
# eps_id              string
# agent_id            null
# module_id           null
# obs                 numpy.ndarray(shape=(4,), dtype=float)
# actions             int32
# rewards             double
# new_obs             numpy.ndarray(shape=(4,), dtype=float)
# terminateds         bool
# truncateds          bool
# action_dist_inputs  numpy.ndarray(shape=(2,), dtype=float)
# action_logp         float
# weights_seq_no      int64

similar as in the tutorial for offline data under (Working with offline data — Ray 2.44.1).

  • Actual result after loading looks like this:
# ------              ----
# eps_id              string
# agent_id            null
# module_id           null
# obs                 string
# actions             int32
# rewards             double
# new_obs             string
# terminateds         bool
# truncateds          bool
# action_dist_inputs  numpy.ndarray(shape=(2,), dtype=float)
# action_logp         float
# weights_seq_no      int64

I have a pytorch model that I want to train with this offline RL data. However, like this I can’t use this data any further, because the obs and new_obs still seem to be serialized and not numpy arrays as expected. I tried to find out how to deserialize the string to numpy myself, but I sadly wasn’t successful at that.

4. Source code:

Saving tabular data:

from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
    COMPONENT_RL_MODULE,
)
from ray.rllib.core.rl_module import RLModuleSpec

# Set up a path for the tabular data records.
output_write_episodes = False
data_path = "tmp/offline_episodes/" if output_write_episodes else "tmp/offline_tabular/"

# Configure the algorithm for recording.
config = (
    PPOConfig()
    # The environment needs to be specified.
    .environment(
        env="CartPole-v1",
    )
    # Make sure to sample complete episodes because
    # you want to record RLlib's episode objects.
    .env_runners(
        batch_mode="complete_episodes",
    )
    # Set up 5 evaluation `EnvRunners` for recording.
    # Sample 50 episodes in each evaluation rollout.
    .evaluation(
        evaluation_num_env_runners=5,
        evaluation_duration=50,
    )
    # Use the checkpointed expert policy from the preceding PPO training.
    # Note, we have to use the same `model_config` as
    # the one with which the expert policy was trained, otherwise
    # the module state can't be loaded.
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
    # Define the output path and format. In this example you
    # want to store data directly in RLlib's episode objects.
    .offline_data(

        # You want to store for this example tabular data.
        output_write_episodes=output_write_episodes,
        output=data_path,
    )
)

#this is a checkpoint of a pretrained PPO agent with average episode length 500
best_checkpoint = '/ray_results/docs_rllib_offline_pretrain_ppo/PPO_CartPole-v1_36f51_00000_0_2025-02-23_18-25-45/checkpoint_000112'

# Build the algorithm.
algo = config.build()
# Load the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
    best_checkpoint,
    # Load only the `RLModule` component here.
    component=COMPONENT_RL_MODULE,
)

# Run 10 evaluation iterations and record the data.
for i in range(100):
    print(f"Iteration {i + 1}")
    if i%10 == 0:
        res_eval = algo.evaluate()
        print(res_eval)

# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()

Loading the .parquet file:

from ray import data
tabular_data_path = "tmp/offline_tabular/"

# Read the tabular data into a Ray dataset.
ds = data.read_parquet(tabular_data_path)
# Now, print its schema.
print("Tabular data schema of expert experiences:\n")
print(ds.schema())

Hi @DennisRTUB thanks for raising this issue. This is a great finding and I guess more users will run into this.

Before storing episodes are encoded using msgpack_numpy. To decode them when getting a batch from the dataset use something like:

          # Import `msgpack` for decoding.
          import msgpack
          import msgpack_numpy as mnp
          
          from ray.rllib.env.single_agent_episode import SingleAgentEpisode

          # Read the episodes and decode them.
          episodes: List[SingleAgentEpisode] = [
              SingleAgentEpisode.from_state(
                  msgpack.unpackb(state, object_hook=mnp.decode)
              )
              for state in batch["item"]
          ]

Let me know, if this helped.

Hello Lars,

thanks for your answer! Unfortunately there are still some open questions from my side:

I first tried to apply the function you mentioned directly to my dataset. However, note that my observations are encoded as strings looking like this BCJNGGhAjwAAAAAAAAAdigAAAFKABZWEAAEA8hqME251bXB5Ll9jb3JlLm51bWVyaWOUjAtfZnJvbWJ1ZmZlcpSTlCiWEC8A8QUA6840vdz1ar7bOyM9uZmwPpSMBUEA8RaUjAVkdHlwZZSTlIwCZjSUiYiHlFKUKEsDjAE8lE5OTkr/////BQDwA0sAdJRiSwSFlIwBQ5R0lFKULgAAAAA=. Creating a batch and directly applying the function you mentioned like this

for batch in ds.iter_batches(batch_size=10):
    decoded_obs = [msgpack.unpackb(obs, object_hook=mnp.decode) for obs in batch["obs"]]

leads to the error

decoded_obs = [msgpack.unpackb(item, object_hook=mnp.decode) for item in batch]
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 177, in msgpack._cmsgpack.unpackb
  File "msgpack/_unpacker.pyx", line 125, in msgpack._cmsgpack.get_data_from_buffer
TypeError: a bytes-like object is required, not 'str'

, therefore it seems like I need a decoding from string to binary at this step. I then assuemd that the str to binary encoding/decoding is done via base64 and tested

import base64
for batch in ds.iter_batches(batch_size=10):
    for obs in batch["obs"]:
        base64.b64decode(obs)
        binary_obs = base64.b64decode(obs)
        decoded_obs = msgpack.unpackb(binary_obs, object_hook=mnp.decode)   

, which results in the error

decoded_obs = msgpack.unpackb(binary_obs, object_hook=mnp.decode)   
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "msgpack/_unpacker.pyx", line 201, in msgpack._cmsgpack.unpackb
msgpack.exceptions.ExtraData: unpack(b) received extra data.

, so the str can be decoded via base64, but the msgpack.unpackb still doesn’t work. Therefore unfortunately I couldn’t figure out how to decode from str to binary and from binary to numpy array and am still stuck.

I would be realy grateful if you could give me a minimalistic code example of how you would obtain a batch from the dataset (obtained from ray.data.read_parquet) and decode str to binary in such a way that the binary can be decoded with a msgpack.unpackb or another function. Also what is state supposed to contain in your example code, a row of tabular data from the dataframe corresponding to one trajectory?

Unfortunately I still couldn’t solve this problem. Is there anyone that has an Idea of how to decode these strings to binary such that msgpack-numpy can applied to the binary?

@DennisRTUB, I am sorry that you still encounter problems and thank you for you patience. I think I might found the cause for this behavior:

The default for AlgorithmConfig.offline_data(compress_output_columns=...) is [Columns.OBS] (as is the one for compress_input_columns). Therefore data is encoded via base64 and compressed with lz4. Two options here:

  1. When writing data use output_compress_columns=[]
  2. Use our unpack_if_needed method (in combination with our from_jsonable_if_needed method, if your space is composite - which is NOT the case for CartPole-v1).

For the second alternative load the method via

from ray.rllib.utils.compression import unpack_if_needed
from ray.rllib.utils.spaces.space_utils import from_jsonable_if_needed

from_jsonable_if_needed(unpack_if_needed(obs), observation_space)

Again, apologies for misleading you above. As a note:

  1. If using episodes as output: Compressed via msgpack_numpy.
  2. If using tabular data as output: Compressed via LZ4 and encoded via base64 (for composite spaces als converted to JSON before).

Hope this helps now.