How to convert Pytorch torch.utils.data.Dataset to ray.data.dataset?

I trained my model with Pytorch, the training dataset is custom pytorch dataset, how to convert torch.utils.data.Dataset to ray.data.dataset?
I only find SimpleTorchDatasource , but this doesn’t support parallel reads. How Can I do?

Besides, I save my training dataset as .h5 file with h5py package.

Hey Xinchengzelin,

Thanks for reaching out!

You’re right that SimpleTorchDatasource doesn’t support parallel reads – the datasource is intended for small datasets or prototypes.

If you want to read the data in parallel, you’ll need to read the underlying data directly. In your case, it seems like the underlying data is a .h5 file.

We don’t currently support reading .h5 files. That said, you might be able to write a custom datasource to do so.

Could you tell me more about your dataset?

  • How big is it? (# samples and file size)
  • How many .h5 files are there?
  • What are the name, shape, and types of the datasets in the .h5 files?

Some other questions:

  • Is your Torch dataset map-style or iterable-style (e.g., can you do dataset[0])?
  • Are you running your program on a single-node or multi-node cluster?

Thanks you, please find the details below:

  • How big is it? (# samples and file size)
    Now about 70GB after ‘lzf’ compression, at least 50times bigger if I don’t compressed it. the small file is several MB while the big file is about 5GB, the file memory depends on the original data. Now about 3,000,000 samples, but it will expand in the future.

  • How many .h5 files are there?
    Now 67 files, but the dataset will expand, maybe 10 times or more.

  • What are the name, shape, and types of the datasets in the .h5 files?
    In the .h5 file: I define several dataset, because the model input needs several input. one model input sample is like: “A”: shape(1,20,2), “B”: shape(1,450,50,10) … some are bool dtype as the mask, others are float32.

  • Is your Torch dataset map-style or iterable-style (e.g., can you do dataset[0])?
    map-style, I can do dataset[0] get a dict {“A”: np.ndarray, “B”: np.ndarray, …}

  • Are you running your program on a single-node or multi-node cluster?
    I want to multi-node cluster, but I meet some problems, now just running in single-node multi-GPU. So I turn to ray for multi-node multi-GPU for training.

If I have to write a custom datasource, could you give me some advices? or you have the test interface for .h5 file?

Hey @Xinchengzelin, thanks for the information!

I think it’d make most sense to extend FileBasedDatasource to read .h5 files. I can walk you through the process.

Just had a few more questions:

  • Is the dataset saved locally on disk or in the cloud? If it’s stored locally on disk, is there a reason why you aren’t storing it in the cloud?
  • Do all of the .h5 files share the same layout?
  • Could you share a sample .h5 file with me? This isn’t necessary, but it’d help me prototype a solution for you. You could fill it with random data if you wanted – just as long as the layout is the same as your actual dataset.

Thanks for your help!

  • Is the dataset saved locally on disk or in the cloud? If it’s stored locally on disk, is there a reason why you aren’t storing it in the cloud?
    saved locally on disk, because we don’t have a cloud now, maybe we will save it in the cloud in the future
  • Do all of the .h5 files share the same layout?
    yes, same layout, just the number of samples is different
  • Could you share a sample .h5 file with me? This isn’t necessary, but it’d help me prototype a solution for you. You could fill it with random data if you wanted – just as long as the layout is the same as your actual dataset.
    please find the data sample, you can read it use h5py(h5py version 3.6.0):
    with h5py.File("filename.h5","r") as data then you can get data[‘key’], data[‘target_obstacle_pos’], and so on …

Hey @Xinchengzelin,

Sorry for the delayed response.

Does this implementation work for you?

import numpy as np
import pandas as pd

import ray
from ray.data.block import Block
from ray.data.datasource import FileBasedDatasource
from ray.data.extensions import TensorArray


class H5Datasource(FileBasedDatasource):

    def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args) -> Block:
        import h5py

        file = h5py.File(f)

        data = {}
        for key, dataset in file.items():
            # `Dataset.read_direct` is efficient because it avoids unnecessary copies.
            array = np.empty(dataset.shape, dtype=dataset.dtype)
            dataset.read_direct(array)
            data[key] = TensorArray(array)

        return pd.DataFrame(data)

    def _open_input_source(
        self,
        filesystem: "pyarrow.fs.FileSystem",
        path: str,
        **open_args,
    ) -> "pyarrow.NativeFile":
        # If we use the default `_open_input_source` implementation, `_read_file` 
        # errors. This is because `h5py.File` expects input files to be seekable,
        # but the default `_open_input_source` implementation returns a non-seekable 
        # file.
        return filesystem.open_input_file(path, **open_args)
>>> import ray
>>> ds = ray.data.read_datasource(H5Datasource(), paths="training_data_vectornet2022-10-11-10-26-52.vectornet_train_data.h5")
>>> ds
Dataset(num_blocks=1, num_rows=461, schema={future_traj: TensorDtype(shape=(30, 3), dtype=float32), key: TensorDtype(shape=(), dtype=|S17), label_mask: TensorDtype(shape=(30,), dtype=bool), polyline_id: TensorDtype(shape=(450, 2), dtype=float32), polyline_mask: TensorDtype(shape=(450,), dtype=bool), rand_mask: TensorDtype(shape=(450,), dtype=bool), target_obstacle_pos: TensorDtype(shape=(20, 2), dtype=float32), target_obstacle_pos_step: TensorDtype(shape=(20, 2), dtype=float32), vector_data: TensorDtype(shape=(450, 50, 9), dtype=float32), vector_mask: TensorDtype(shape=(450, 50), dtype=bool)})

Thanks for your solution, could I ask another 2 question?

  1. Because some files are too big to fit in RAM, could the file content return like a generator?
  2. I have hundreds of files, I should use ray.data.Dataset.union to merge all the files into 1 ray.data.Dataset?

Because some files are too big to fit in RAM, could the file content return like a generator?

What happens when you read a large file with H5Datasource? If you get an error, could you share the traceback with me?

H5Datasource shouldn’t read the entire file into memory. I might’ve missed something, though.

I have hundreds of files, I should use ray.data.Dataset.union to merge all the files into 1 ray.data.Dataset?

This shouldn’t be necessary.

You can be either pass all of the files to read_datasource:

ds = ray.data.read_datasource(H5Datasource(), paths=["data1.h5", "data2.h5", ..., "data100.h5"])

Or you can pass a directory:

ds = ray.data.read_datasource(H5Datasource(), paths="./data/")

I can read several files at the same time as your instructions.
Maybe just because the read progress is very slow, I read 7 files in a folder, then happens:

  • 6 mins later, read progress bar show
  • about 8mins later, the RAM is full
  • about 11mins later, I killed it, the log shows below

Actually, I define a “length” attribute in the .h5 file, so when open the file, I can know how many samples in the file. Could we use this attribute to accelerate the read progress.

2022-11-03 09:20:12,794 INFO worker.py:1509 – Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
2022-11-03 09:20:14,483 WARNING read_api.py:280 – :warning: The blocks of this dataset are estimated to be 1.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using .repartition(n) to increase the number of dataset blocks.
Read: 22%|███████████████████████████▎ | 2/9 [00:08<00:25, 3.59s/it]^C^C2022-11-03 09:32:21,814 WARNING worker.py:1829 – A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: a538b19dc5e034d296b5a8df9d3662d0301d7bc301000000 Worker ID: 659eb3d7c980f3bc9a839e9eb7948178abc77c68812b36d4b2ba8d68 Node ID: 52d4e42d819d935c07e6b922d58a5ccbc7512ae6220925515dbb623e Worker IP address: 10.19.197.177 Worker port: 32875 Worker PID: 97780 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned.
^C^C^C^C^C^C^C^C^C^C^C^C^C^C^C^CTraceback (most recent call last):
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/_internal/compute.py”, line 113, in _apply
results = map_bar.fetch_until_complete(refs)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/_internal/progress_bar.py”, line 74, in fetch_until_complete
done, remaining = ray.wait(remaining, fetch_local=True, timeout=0.1)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py”, line 105, in wrapper
return func(*args, **kwargs)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py”, line 2461, in wait
ready_ids, remaining_ids = worker.core_worker.wait(
File “python/ray/_raylet.pyx”, line 1414, in ray._raylet.CoreWorker.wait
File “python/ray/_raylet.pyx”, line 173, in ray._raylet.check_status
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/home/zetlin/Code/Prediction_ML/test2.py”, line 40, in
ds = ray.data.read_datasource(H5Datasource(), #10-10-17-39-56 09-15-15-02-17
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/dataset.py”, line 736, in repartition
return Dataset(plan, self._epoch, self._lazy)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/dataset.py”, line 201, in init
self._plan.execute(allow_clear_input_blocks=False)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/_internal/plan.py”, line 308, in execute
blocks, stage_info = stage(
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/_internal/plan.py”, line 662, in call
blocks = compute._apply(
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/data/_internal/compute.py”, line 122, in _apply
ray.get(ref)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py”, line 105, in wrapper
return func(*args, **kwargs)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py”, line 2269, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File “/home/zetlin/miniconda3/lib/python3.8/site-packages/ray/_private/worker.py”, line 669, in get_objects
data_metadata_pairs = self.core_worker.get_objects(
File “python/ray/_raylet.pyx”, line 1211, in ray._raylet.CoreWorker.get_objects
File “python/ray/_raylet.pyx”, line 173, in ray._raylet.check_status
KeyboardInterrupt
Read: 22%|███████████████████████████ | 2/9 [06:05<21:17, 182.

Hey @Xinchengzelin, thanks for the update.

I’m not sure why this is using up so much memory. I wonder if it’s related to _open_input_source.

@chengsu could you take a look? I’m not sure if I have enough expertise with Datasets to efficiently debug this.

1 Like

Thank you very much@bveeramani

Please help me if you have some time, thanks @chengsu

Hi @Xinchengzelin - sorry for replying late. Do you want to have a live chat to debug this? It looks like OOM to me, but it would be easier to debug together to figure out root cause. What’s your email address so I can send out a zoom meeting invite? Thanks.

Thanks, my email address: linzexu@126.com

Unfortunately, my oral English couldn’t help me communicate fluently, if you need more information, I could prepare for you.

The error traceback info:

2022-11-16 11:59:25,130 INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
2022-11-16 11:59:26,976 WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 1.5x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-11-16 12:06:29,505 WARNING worker.py:1829 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c0903b5603b8d63eae6fb7b7fc6c504f997eb44801000000 Worker ID: d6c2c5352eb946ba6a811c9a4034888e51798e56e8ca75d1f4b16e11 Node ID: 18cda327eda954a85405f5df1aef705962aa4dd9c8118970c8105369 Worker IP address: 10.19.197.177 Worker port: 40423 Worker PID: 72380 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2022-11-16 12:17:00,873 WARNING worker.py:1829 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 73c44ac85ca175fc006fe0082d1f2a3432dfd8e201000000 Worker ID: b5341fd1f1e528a9ec65a150f29f1cd6479de211dad93056f770ea72 Node ID: 18cda327eda954a85405f5df1aef705962aa4dd9c8118970c8105369 Worker IP address: 10.19.197.177 Worker port: 46475 Worker PID: 72381 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2022-11-16 12:32:18,543 WARNING worker.py:1829 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: dc0fef3c9e8c440f3e6d468d8293d71b8322144001000000 Worker ID: 820f43800472a7de337aa5387877ad1680eb2adb6a5952f0821a3fa2 Node ID: 18cda327eda954a85405f5df1aef705962aa4dd9c8118970c8105369 Worker IP address: 10.19.197.177 Worker port: 34677 Worker PID: 72379 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2022-11-16 12:48:06,091 WARNING worker.py:1829 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e9dfce20b21fe13708e767ac1691ae31affb86f401000000 Worker ID: d5a1f950b4b8e744d408d7b3f02fc3d9734c363c16a7f8143303a5b3 Node ID: 18cda327eda954a85405f5df1aef705962aa4dd9c8118970c8105369 Worker IP address: 10.19.197.177 Worker port: 37549 Worker PID: 72373 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.

@chengsu @bveeramani

Hi, guys, I tried CSVDatasource, when calls ray.data.read_datasource, the used RAM memory almost need to cover one file’s size. So if the .csvfile is larger than RAM memory, CSVDatasource will also show OOM error ?
That means if one of my .h5 files couldn’t fit in the RAM, H5Datasource will show OOM error ?

Hi @Xinchengzelin - as discussed offline in the meeting, let’s try out with followed code to read data in a batch fasion, and use streaming ingest with TorchTrainer (link)

from typing import Iterator
import pandas as pd

import ray
from ray.data.block import Block
from ray.data.datasource import FileBasedDatasource
from ray.data.extensions import TensorArray

input_keys = ['target_obstacle_pos','target_obstacle_pos_step', 'vector_data',
                 'vector_mask','polyline_mask', "rand_mask", 'polyline_id','key']

class H5Datasource(FileBasedDatasource):

    def _read_stream(self, f: "pyarrow.NativeFile", path: str, **reader_args) -> Iterator[Block]:
        import h5py

        batch_size = reader_args.get("batch_size", None)
        
        with h5py.File(f) as h5_file:
            file_size = h5_file.attrs["length"]

            if batch_size is None:
                res = {}
                for col in input_keys:
                    res[col] = TensorArray(h5_file[col][0:file_size])
                yield pd.DataFrame(res)
            else:
                current_size = 0
                res = {}
                while current_size < file_size:
                    next_batch_size = min(batch_size, file_size - current_size)
                    for col in input_keys:
                        res[col] = TensorArray(h5_file[col][current_size:current_size + next_batch_size])
                    yield pd.DataFrame(res)
                    res = {}
                    current_size += next_batch_size

    def _open_input_source(
        self,
        filesystem: "pyarrow.fs.FileSystem",
        path: str,
        **open_args,
    ) -> "pyarrow.NativeFile":
        # If we use the default `_open_input_source` implementation, `_read_file` 
        # errors. This is because `h5py.File` expects input files to be seekable,
        # but the default `_open_input_source` implementation returns a non-seekable 
        # file.
        return filesystem.open_input_file(path, **open_args)

ds = ray.data.read_datasource(H5Datasource(), paths="data.h5", batch_size=64)
1 Like