Ray dataset support snappy

Hi, I have some text files compressed in snappy format on hdfs, I tried to read with ray.data, but not working:

ray.data.read_text("hdfs://path/test.snappy", arrow_open_stream_args={"compression":"snappy"})

The error is:

(remote_read pid=1075016) 2022-02-01 07:46:44,510	INFO worker.py:431 -- Task failed with retryable exception: TaskID(69a6825d641b4613ffffffffffffffffffffffff01000000).
(remote_read pid=1075016) Traceback (most recent call last):
(remote_read pid=1075016)   File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task
(remote_read pid=1075016)   File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task
(remote_read pid=1075016)   File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 166, in remote_read
(remote_read pid=1075016)     return task()
(remote_read pid=1075016)   File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 120, in __call__
(remote_read pid=1075016)     result = self._read_fn()
(remote_read pid=1075016)   File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 174, in <lambda>
(remote_read pid=1075016)     read_files(read_paths, filesystem)], meta
(remote_read pid=1075016)   File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 145, in read_files
(remote_read pid=1075016)     with fs.open_input_stream(read_path, **open_stream_args) as f:
(remote_read pid=1075016)   File "pyarrow/_fs.pyx", line 627, in pyarrow._fs.FileSystem.open_input_stream
(remote_read pid=1075016)   File "pyarrow/_fs.pyx", line 557, in pyarrow._fs.FileSystem._wrap_input_stream
(remote_read pid=1075016)   File "pyarrow/io.pxi", line 1283, in pyarrow.lib.CompressedInputStream.__init__
(remote_read pid=1075016)   File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
(remote_read pid=1075016)   File "pyarrow/error.pxi", line 120, in pyarrow.lib.check_status
(remote_read pid=1075016) pyarrow.lib.ArrowNotImplementedError: Streaming decompression unsupported with Snappy
Traceback (most recent call last):
  File "test_raydata.py", line 7, in <module>
    ds = ray.data.read_text(path, arrow_open_stream_args={"compression":"snappy"})
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 405, in read_text
    lambda x: x.decode(encoding).split("\n"))
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/dataset.py", line 279, in flat_map
    compute.apply(transform, ray_remote_args, self._blocks),
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/impl/compute.py", line 40, in apply
    blocks = list(blocks.iter_blocks_with_metadata())
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/impl/lazy_block_list.py", line 90, in __next__
    partition = ray.get(next(self._base_iter))
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::remote_read() (pid=1075016, ip=10.21.77.23)
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 166, in remote_read
    return task()
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 120, in __call__
    result = self._read_fn()
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 174, in <lambda>
    read_files(read_paths, filesystem)], meta
  File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 145, in read_files
    with fs.open_input_stream(read_path, **open_stream_args) as f:
  File "pyarrow/_fs.pyx", line 627, in pyarrow._fs.FileSystem.open_input_stream
  File "pyarrow/_fs.pyx", line 557, in pyarrow._fs.FileSystem._wrap_input_stream
  File "pyarrow/io.pxi", line 1283, in pyarrow.lib.CompressedInputStream.__init__
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 120, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Streaming decompression unsupported with Snappy

It looks like Arrow does not support streaming (de)compression for Snappy since the C++ Snappy implementation itself doesn’t support streaming (de)compression. We could look at falling back to one-shot decompression on non-streaming reads for Snappy files, I’ll open an issue for that.

Does this have any performance impact?

Opened an issue: [Datasets] [Bug] Streaming reads for Snappy-compressed files not supported · Issue #22023 · ray-project/ray · GitHub

Does this have any performance impact?

This shouldn’t have a big performance impact, no, since we currently read the entire file into memory anyway. If we wanted to eventually stream chunks of the file into the object store and allow for pipelining on windows within a file, then we wouldn’t be able to support that for Snappy-compressed files.