Offline tutorial : TypeError: must be Tensor, not numpy.ndarray

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hello, following the tutorial for offline data : Working with offline data — Ray 2.42.0 I’m stuck at training where the input is not cast to tensor type.

I checked that observation, action are list of float and reward is a float.

I wanted to be able to use RlLib directly with the ray.data.read_databricks_tables like :

dataset = ray.data.read_databricks_tables(... args ...)
config = (
    BCConfig()
    ...
    .offline_data(input_=dataset, input_schema=schema_for_columns_to_obs_action_rewards) 
    ...
)

However it doesn’t seem possible and it seems okay and relevant to have to convert the data to episodes.

So I tried the code bellow.

Versions / Dependencies

Running on
ray : 2.42.0
databricks : DBR 16.0
python : 3.12.3

Reproduction script

import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

try:
    shutdown_ray_cluster()
except: 
    pass

ray.shutdown()
setup_ray_cluster(max_worker_nodes=1, num_cpus_per_worker=8)
ray.init()

import gymnasium as gym
import msgpack
import msgpack_numpy as mnp

from collections import defaultdict

from ray.rllib.env.single_agent_episode import SingleAgentEpisode

import os
api_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
os.environ["DATABRICKS_TOKEN"] = api_token

folder_where_to_write = "/Volumes/some/folder/test_ray_convertion"
dataset = ray.data.read_databricks_tables(
    warehouse_id="some_id",
    catalog="some_catalog",
    schema="some_schema",
    query = """
    SELECT 
        some.column AS actions,
        other_column AS obs,
        another_column AS rewards,
        column AS next_obs,
        column AS infos,
        column AS done
    FROM some_table
"""
)

eps_obs = []
eps_actions = []
eps_rewards = []
extra_model_outputs = defaultdict(list)
episodes = []

action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
observation_space = Box(low=0, high=5, shape=(2,), dtype=float)

episode = SingleAgentEpisode(action_space=action_space, observation_space=observation_space)
episode.add_env_reset(observation=[0,0])

for i, row in enumerate(dataset.sort(["some_key"]).iter_rows(prefetch_batches=10)):
    if not row["done"]:
        episode.add_env_step(
            observation=row["obs"],
            action=row["actions"],
            reward=row["rewards"],
            terminated=False
        )
    else:
        episode.add_env_step(
            observation=row["next_obs"],
            action=row["actions"],
            reward=row["rewards"],
            terminated=True
        )
        episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode)
        episode = SingleAgentEpisode()
        episode.add_env_reset(observation=[0,0])

    if len(episodes) > 5:
        episodes_ds = ray.data.from_items(episodes)
        episodes_ds.write_parquet(
            f"{folder_where_to_write}/file-{i}".zfill(6),
            compression="gzip",
        )
        del episodes_ds
        episodes.clear()

if len(episodes) > 0:
    episodes_ds = ray.data.from_items(episodes)
    episodes_ds.write_parquet(
        f"{folder_where_to_write}/file-{i}".zfill(6),
        compression="gzip",
    )
    del episodes_ds
    episodes.clear()

Then the training :

class DummyEnv(Env):
    def __init__(self, config):
        super().__init__()
        self.action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
        self.observation_space = Box(low=0, high=5, shape=(2,), dtype=float)
    
    def reset(self, seed=None, options=None):
        observation = self.observation_space.sample()
        return observation
        # return torch.from_numpy(observation).float() # No success
    
    # def step(self, action):
    #     return self.observation_space.sample(), 1, True, {}

def env_creator(env_config):
    return DummyEnv(env_config)

register("DummyEnv", env_creator)
tune.register_env("DummyEnv", env_creator)

config = (
    BCConfig()
    .environment(
        env=DummyEnv,
        observation_space=Box(low=0, high=1, shape=(2,), dtype=float),
        action_space=Box(low=-1, high=1, shape=(4,), dtype=float),
    )
    .offline_data(
        input_="parquet/file/with/the/episodes",
        dataset_num_iters_per_learner=100,
        input_read_episodes=True
    )
    .evaluation(
        evaluation_interval=3,
        evaluation_num_env_runners=1,
        evaluation_duration=5,
        evaluation_parallel_to_training=True,
    )
)

metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"

tuner = tune.Tuner(
    "BC",
    param_space=config,
    run_config=train.RunConfig(
        # name="docs_rllib_offline_bc",
        stop={metric: 450.0},
        checkpoint_config=train.CheckpointConfig(
            checkpoint_frequency=0,
            checkpoint_at_end=True,
        ),
        verbose=2,
    )
)
analysis = tuner.fit()

However I end up with a

File "/databricks/python/lib/python3.12/site-packages/torch/nn/modules/linear.py", line 117, in forward
    return F.linear(input, self.weight, self.bias)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.ndarray

I’ve see that I can do a OfflinePreLearner to maybe convert the data but it seems to much…

Thanks in advance for any help
Have a good day :slight_smile:

Hi ierezell, welcome to the Ray community!

The issue you’re running into is caused by a mismatch between the expected input format for PyTorch models and the data format you’re providing. The error message suggests that PyTorch’s linear function expects a torch.Tensor, but your code is supplying a numpy.ndarray instead.

You need to ensure that any data passed into the model is properly converted to a PyTorch tensor. One way to do this is by modifying the DummyEnv class so that observations are returned as tensors.

Maybe you can try this out?

import torch
from gymnasium import Env
from gymnasium.spaces import Box

class DummyEnv(Env):
    def __init__(self, config):
        super().__init__()
        self.action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
        self.observation_space = Box(low=0, high=5, shape=(2,), dtype=float)
    
    def reset(self, seed=None, options=None):
        observation = self.observation_space.sample()
        return torch.tensor(observation, dtype=torch.float32)  # Convert to tensor
    
    def step(self, action):
        observation = self.observation_space.sample()
        return torch.tensor(observation, dtype=torch.float32), 1, True, {}  # Convert to tensor

Here’s some more reading that might be helpful, let me know if this works for you!
Docs:

Hi @christina thanks for the fast and amazing support ! :smiley: really appreciated.

I’ve tried to modify the DummyEnv (and I just retried copy pasting your solution) but I still have the same error (TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.ndarray)

if it helps, the data that I have in the parquet folder, if I do msgpack.unpackb(df.take(1)[0]["item"]) gives :

{'id_': '865415c21b464d7b81deffe3057502db',
 'agent_id': None,
 'module_id': None,
 'multi_agent_episode_id': None,
 'observations': {'data': [[0, 0], [0, 0]],
  'lookback': 0,
  'finalized': False,
  'space': None},
 'actions': {'data': [[2579.833984375,
    -2651.8525390625,
    7.480941295623779,
    0.0]],
  'lookback': 0,
  'finalized': False,
  'space': None},
 'rewards': {'data': [0.0],
  'lookback': 0,
  'finalized': False,
  'space': {'space': 'box',
   'low': 'eJyb7BfqGxDJyFDGUK2eklqcXKRupaBuk2airqOgnpZfVFKUmBefX5SSChJ3S8wpTgWKF2ckFqQC+RqaOgq1ChQBLgaGhv8AupYcug==',
   'high': 'eJyb7BfqGxDJyFDGUK2eklqcXKRupaBuk2airqOgnpZfVFKUmBefX5SSChJ3S8wpTgWKF2ckFqQC+RqaOgq1ChQBLgaGhnoAuhYcOg==',
   'shape': [],
   'dtype': '<f4'}},
 'infos': {'data': [{}, {}], 'lookback': 0, 'finalized': False, 'space': None},
 'extra_model_outputs': None,
 'is_terminated': True,
 'is_truncated': False,
 't_started': 0,
 't': 1,
 '_observation_space': None,
 '_action_space': None,
 '_start_time': 3141.493850426,
 '_last_step_time': 3141.494328592,
 '_temporary_timestep_data': None}

Thanks for your help :slight_smile:

Hmm ok, so it looks like we still are passing not the correct object type in.

The error message you’re getting suggests that somewhere in your code, an input intended for a PyTorch model hasn’t been converted to a torch.Tensor still. You may need to check not only where the data is provided, but also further along in your data pipeline or training loop.

There’s a few things that I think we can still check to debug:

  1. Check Data Transformation: Make sure that not just the initial data, but all transformed data, that eventually enter the model are converted to tensors. This includes when data is fetched from the parquet files, or processed within the training loop.
  2. Custom Pre-Processing: You might need to implement a custom pre-processing step before the data reaches the model to ensure everything is in the right format, especially since the original data in the parquet file seems to work with nested structures. Like writing a validation function.
  3. Debugging Step: Insert print statements or use a debugging tool to trace the type of data right before it gets passed into the model’s linear layer. This will help identify where the data might need transformation into a tensor. Ray has a great debugger that you can check out but regular debuggers should help too.

You can add a try catch for the error like this:

# Near or within the training loop, check the data type
def some_training_function(data):
    if not isinstance(data, torch.Tensor):
        raise TypeError(f"Expected data to be torch.Tensor but got {type(data)}")

ARe there like any other places in your code where you’re calling and throwing in data that might be causing this error? :thinking:

Hello Christina :slight_smile:

Yes I understand the error.

However, I’m do not have any custom/personal code (except the DummyEnv), the only thing i’m trying is to load my data and use all premade rllib algorithm.

I’ve checked the example (with cartpole) and the data is the following :

{'id_': '8a936b560f134ddaaee02abf1747c9a1',
 'agent_id': None,
 'module_id': None,
 'multi_agent_episode_id': None,
 'observations': {'data': array(
         [
              [ 2.84415539e-02, -1.40919909e-02, -1.42102195e-02, -1.49316313e-02],
              [ 2.81597134e-02, -2.09007293e-01, -1.45088527e-02,  2.73234159e-01],
              ....., ......, .....
         ], dtype=float32),
  .... many data more

With this data I’m able to launch the training.

My data is the same but I’m having list of lists instead of array for observations data.

So it seems that the problems lies in the fact that RlLib is implicitly expecting np.array and not list (RlLib could cast it though… I could raise an issue on github) so I tried to change the code transforming my tabular data to SingleAgentEpisodes :

if not row["done"]:
        eps_obs.append(row["obs"] if row["obs"] else [0,0])
        eps_actions.append(row["actions"] if row["actions"] else [0,0,0,0])
        eps_rewards.append(row["rewards"] if row["rewards"] else 0.0)
    else:
        eps_obs.append(row["next_obs"]  if row["next_obs"] else [0,0])
        eps_actions.append(row["actions"] if row["actions"] else [0,0,0,0])
        eps_rewards.append(row["rewards"] if row["rewards"] else 0.0)

        episode = SingleAgentEpisode(
            action_space=action_space, 
            observation_space=observation_space, 
            observations=np.array(eps_obs), 
            rewards=np.array(eps_rewards),
            actions=np.array(eps_actions),
            terminated=True, 
        )

However, observations cannot be np.array as it make the code crash

rllib/env/single_agent_episode.py:273
infos = infos or [{} for _ in range(len(observations or []))]

--> ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

Could you help me having a correct way to convert my data to SingleAgentEpisode ?

Ok so I think I finally got it but… I guess you would have to update the doc… or maybe make examples ?

For the posterity :

Writting as a list (as in the official doc) is not working properly… but using the add_env_reset and add_env_step is okay.

The DummyEnv do not need pytorch it even complain if it’s not numpy.

class DummyEnv(Env):
    def __init__(self, config):
        super().__init__()
        self.action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
        self.observation_space = Box(low=0, high=5, shape=(2,), dtype=float)
    
    def reset(self, seed=None, options=None):
        observation = self.observation_space.sample()
        info = {"lol": "yolo"}
        return observation, info
    
    def step(self, action):
        observation = self.observation_space.sample()
        reward = 1 
        truncated = False
        terminated = False
        info = {"lol": "yolo"}
        return observation, reward, terminated, truncated, info
# rest of code ... 

for i, row in enumerate(dataset.sort(["some_key"]).iter_rows()):
    if not row["done"]:
        # Not sure np.array is needed as we cast all to a big np.array at the end but...
        episode.add_env_step(
            observation=np.array(row["obs"]) if row["obs"] else np.array([0.0, 0.0], dtype=np.float32),
            action=np.array(row["actions"]) if row["actions"] else np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32),
            reward=np.array(row["rewards"]) if row["rewards"] else np.array(0.0, dtype=np.float32),
            terminated=False
        )
    else:
        episode.add_env_step( # Version 1
            observation=np.array(row["next_obs"])  if row["next_obs"] else np.array([0.0,0.0], dtype=np.float32),
            action=np.array(row["actions"]) if row["actions"] else np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32),
            reward=np.array(row["rewards"]) if row["rewards"] else np.array(0.0, dtype=np.float32),
            terminated=True
        )

        # THOSE LINES ARE HACKISH BUT MAKE IT WORK. 
        # Else we get a list[np.array, np.array] and training fails
        episode.observations.data = np.array(episode.observations.data)
        episode.rewards.data = np.array(episode.rewards.data)
        episode.actions.data = np.array(episode.actions.data)

        episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode))

        episode = SingleAgentEpisode()
        episode.add_env_reset(observation=np.array([0.0,0.0], dtype=np.float32))

# Rest of code to write episodes

I think adding examples or updating the docs would be a great idea. I can put this on my to-do list later. Thank you for letting me know what worked! :slight_smile: Alternatively, if you would like to make a pull request to the docs yourself, you totally can, since Ray is open source. You can check out our Github if you’d like to try it out sometime.

1 Like