Hey @Xinchengzelin,
Sorry for the delayed response.
Does this implementation work for you?
import numpy as np
import pandas as pd
import ray
from ray.data.block import Block
from ray.data.datasource import FileBasedDatasource
from ray.data.extensions import TensorArray
class H5Datasource(FileBasedDatasource):
def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args) -> Block:
import h5py
file = h5py.File(f)
data = {}
for key, dataset in file.items():
# `Dataset.read_direct` is efficient because it avoids unnecessary copies.
array = np.empty(dataset.shape, dtype=dataset.dtype)
dataset.read_direct(array)
data[key] = TensorArray(array)
return pd.DataFrame(data)
def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
# If we use the default `_open_input_source` implementation, `_read_file`
# errors. This is because `h5py.File` expects input files to be seekable,
# but the default `_open_input_source` implementation returns a non-seekable
# file.
return filesystem.open_input_file(path, **open_args)
>>> import ray
>>> ds = ray.data.read_datasource(H5Datasource(), paths="training_data_vectornet2022-10-11-10-26-52.vectornet_train_data.h5")
>>> ds
Dataset(num_blocks=1, num_rows=461, schema={future_traj: TensorDtype(shape=(30, 3), dtype=float32), key: TensorDtype(shape=(), dtype=|S17), label_mask: TensorDtype(shape=(30,), dtype=bool), polyline_id: TensorDtype(shape=(450, 2), dtype=float32), polyline_mask: TensorDtype(shape=(450,), dtype=bool), rand_mask: TensorDtype(shape=(450,), dtype=bool), target_obstacle_pos: TensorDtype(shape=(20, 2), dtype=float32), target_obstacle_pos_step: TensorDtype(shape=(20, 2), dtype=float32), vector_data: TensorDtype(shape=(450, 50, 9), dtype=float32), vector_mask: TensorDtype(shape=(450, 50), dtype=bool)})