Hi, I have some text files compressed in snappy format on hdfs, I tried to read with ray.data, but not working:
ray.data.read_text("hdfs://path/test.snappy", arrow_open_stream_args={"compression":"snappy"})
The error is:
(remote_read pid=1075016) 2022-02-01 07:46:44,510 INFO worker.py:431 -- Task failed with retryable exception: TaskID(69a6825d641b4613ffffffffffffffffffffffff01000000).
(remote_read pid=1075016) Traceback (most recent call last):
(remote_read pid=1075016) File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task
(remote_read pid=1075016) File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task
(remote_read pid=1075016) File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 166, in remote_read
(remote_read pid=1075016) return task()
(remote_read pid=1075016) File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 120, in __call__
(remote_read pid=1075016) result = self._read_fn()
(remote_read pid=1075016) File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 174, in <lambda>
(remote_read pid=1075016) read_files(read_paths, filesystem)], meta
(remote_read pid=1075016) File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 145, in read_files
(remote_read pid=1075016) with fs.open_input_stream(read_path, **open_stream_args) as f:
(remote_read pid=1075016) File "pyarrow/_fs.pyx", line 627, in pyarrow._fs.FileSystem.open_input_stream
(remote_read pid=1075016) File "pyarrow/_fs.pyx", line 557, in pyarrow._fs.FileSystem._wrap_input_stream
(remote_read pid=1075016) File "pyarrow/io.pxi", line 1283, in pyarrow.lib.CompressedInputStream.__init__
(remote_read pid=1075016) File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
(remote_read pid=1075016) File "pyarrow/error.pxi", line 120, in pyarrow.lib.check_status
(remote_read pid=1075016) pyarrow.lib.ArrowNotImplementedError: Streaming decompression unsupported with Snappy
Traceback (most recent call last):
File "test_raydata.py", line 7, in <module>
ds = ray.data.read_text(path, arrow_open_stream_args={"compression":"snappy"})
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 405, in read_text
lambda x: x.decode(encoding).split("\n"))
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/dataset.py", line 279, in flat_map
compute.apply(transform, ray_remote_args, self._blocks),
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/impl/compute.py", line 40, in apply
blocks = list(blocks.iter_blocks_with_metadata())
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/impl/lazy_block_list.py", line 90, in __next__
partition = ray.get(next(self._base_iter))
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/worker.py", line 1713, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowNotImplementedError): ray::remote_read() (pid=1075016, ip=10.21.77.23)
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/read_api.py", line 166, in remote_read
return task()
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 120, in __call__
result = self._read_fn()
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 174, in <lambda>
read_files(read_paths, filesystem)], meta
File "/home/tiger/anaconda3/envs/ray/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 145, in read_files
with fs.open_input_stream(read_path, **open_stream_args) as f:
File "pyarrow/_fs.pyx", line 627, in pyarrow._fs.FileSystem.open_input_stream
File "pyarrow/_fs.pyx", line 557, in pyarrow._fs.FileSystem._wrap_input_stream
File "pyarrow/io.pxi", line 1283, in pyarrow.lib.CompressedInputStream.__init__
File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 120, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Streaming decompression unsupported with Snappy