How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hello, following the tutorial for offline data : Working with offline data — Ray 2.42.0 I’m stuck at training where the input is not cast to tensor type.
I checked that observation, action are list of float and reward is a float.
I wanted to be able to use RlLib directly with the ray.data.read_databricks_tables
like :
dataset = ray.data.read_databricks_tables(... args ...)
config = (
BCConfig()
...
.offline_data(input_=dataset, input_schema=schema_for_columns_to_obs_action_rewards)
...
)
However it doesn’t seem possible and it seems okay and relevant to have to convert the data to episodes.
So I tried the code bellow.
Versions / Dependencies
Running on
ray : 2.42.0
databricks : DBR 16.0
python : 3.12.3
Reproduction script
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
try:
shutdown_ray_cluster()
except:
pass
ray.shutdown()
setup_ray_cluster(max_worker_nodes=1, num_cpus_per_worker=8)
ray.init()
import gymnasium as gym
import msgpack
import msgpack_numpy as mnp
from collections import defaultdict
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
import os
api_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
os.environ["DATABRICKS_TOKEN"] = api_token
folder_where_to_write = "/Volumes/some/folder/test_ray_convertion"
dataset = ray.data.read_databricks_tables(
warehouse_id="some_id",
catalog="some_catalog",
schema="some_schema",
query = """
SELECT
some.column AS actions,
other_column AS obs,
another_column AS rewards,
column AS next_obs,
column AS infos,
column AS done
FROM some_table
"""
)
eps_obs = []
eps_actions = []
eps_rewards = []
extra_model_outputs = defaultdict(list)
episodes = []
action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
observation_space = Box(low=0, high=5, shape=(2,), dtype=float)
episode = SingleAgentEpisode(action_space=action_space, observation_space=observation_space)
episode.add_env_reset(observation=[0,0])
for i, row in enumerate(dataset.sort(["some_key"]).iter_rows(prefetch_batches=10)):
if not row["done"]:
episode.add_env_step(
observation=row["obs"],
action=row["actions"],
reward=row["rewards"],
terminated=False
)
else:
episode.add_env_step(
observation=row["next_obs"],
action=row["actions"],
reward=row["rewards"],
terminated=True
)
episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode)
episode = SingleAgentEpisode()
episode.add_env_reset(observation=[0,0])
if len(episodes) > 5:
episodes_ds = ray.data.from_items(episodes)
episodes_ds.write_parquet(
f"{folder_where_to_write}/file-{i}".zfill(6),
compression="gzip",
)
del episodes_ds
episodes.clear()
if len(episodes) > 0:
episodes_ds = ray.data.from_items(episodes)
episodes_ds.write_parquet(
f"{folder_where_to_write}/file-{i}".zfill(6),
compression="gzip",
)
del episodes_ds
episodes.clear()
Then the training :
class DummyEnv(Env):
def __init__(self, config):
super().__init__()
self.action_space = Box(low=-1, high=1, shape=(4,), dtype=float)
self.observation_space = Box(low=0, high=5, shape=(2,), dtype=float)
def reset(self, seed=None, options=None):
observation = self.observation_space.sample()
return observation
# return torch.from_numpy(observation).float() # No success
# def step(self, action):
# return self.observation_space.sample(), 1, True, {}
def env_creator(env_config):
return DummyEnv(env_config)
register("DummyEnv", env_creator)
tune.register_env("DummyEnv", env_creator)
config = (
BCConfig()
.environment(
env=DummyEnv,
observation_space=Box(low=0, high=1, shape=(2,), dtype=float),
action_space=Box(low=-1, high=1, shape=(4,), dtype=float),
)
.offline_data(
input_="parquet/file/with/the/episodes",
dataset_num_iters_per_learner=100,
input_read_episodes=True
)
.evaluation(
evaluation_interval=3,
evaluation_num_env_runners=1,
evaluation_duration=5,
evaluation_parallel_to_training=True,
)
)
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"
tuner = tune.Tuner(
"BC",
param_space=config,
run_config=train.RunConfig(
# name="docs_rllib_offline_bc",
stop={metric: 450.0},
checkpoint_config=train.CheckpointConfig(
checkpoint_frequency=0,
checkpoint_at_end=True,
),
verbose=2,
)
)
analysis = tuner.fit()
However I end up with a
File "/databricks/python/lib/python3.12/site-packages/torch/nn/modules/linear.py", line 117, in forward
return F.linear(input, self.weight, self.bias)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.ndarray
I’ve see that I can do a OfflinePreLearner to maybe convert the data but it seems to much…
Thanks in advance for any help
Have a good day