Running into issues using ray.data.read_parquet() against azure blob storage.
Cluster:
- Tested against 2.21.0 (via kuberay on aks)
- pyarrow - 12.0.1
- adlfs 2024.4.1
- fsspec 2024.3.1
For me running the example code to load a parquet file from Azure Blob Storage link doesn’t work.
Executing from the head node
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/read_api.py", line 752, in read_parquet
datasource = ParquetDatasource(
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 246, in __init__
pq_ds = pq.ParquetDataset(
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1776, in __new__
return _ParquetDatasetV2(
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2465, in __init__
finfo = filesystem.get_file_info(path_or_paths)
File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/fs.py", line 332, in get_file_info
info = self.fs.info(path)
File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 103, in sync
raise return_result
File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
result[0] = await coro
File "/home/ray/anaconda3/lib/python3.10/site-packages/adlfs/spec.py", line 623, in _info
props = await bc.get_blob_properties(version_id=version_id)
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/tracing/decorator_async.py", line 77, in wrapper_use_tracer
return await func(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 783, in get_blob_properties
process_storage_error(error)
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/_shared/response_handlers.py", line 182, in process_storage_error
exec("raise error from None") # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 773, in get_blob_properties
blob_props = await self._client.blob.get_properties(
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/tracing/decorator_async.py", line 77, in wrapper_use_tracer
return await func(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py", line 487, in get_properties
map_error(status_code=response.status_code, response=response, error_map=error_map)
File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/exceptions.py", line 164, in map_error
raise error
azure.core.exceptions.ClientAuthenticationError: Operation returned an invalid status 'Server failed to authenticate the request. Please refer to the information in the www-authenticate header.'
ErrorCode:NoAuthenticationInformation
Code:
import adlfs
ds = ray.data.read_parquet(
"az://ray-example-data/iris.parquet",
filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
print(ds.schema())
I tested this out against a different dataset that I know to be working.
This works just fine using dask:
import adlfs
import dask.dataframe as dd
storage_options = {'account_name': 'azureopendatastorage'}
ddf = dd.read_parquet('az://nyctlc/green/puYear=2019/puMonth=*/*.parquet', storage_options=storage_options)
ddf.head()
Here is the same example using Ray:
code
import adlfs
ds = ray.data.read_parquet(
"az://nyctlc/green/puYear=2019/puMonth=*/*.parquet",
filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
output:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/read_api.py", line 752, in read_parquet
datasource = ParquetDatasource(
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 253, in __init__
_handle_read_os_error(e, paths)
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/file_meta_provider.py", line 400, in _handle_read_os_error
raise error
File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 246, in __init__
pq_ds = pq.ParquetDataset(
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1776, in __new__
return _ParquetDatasetV2(
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2490, in __init__
self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 763, in dataset
return _filesystem_dataset(source, **kwargs)
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 446, in _filesystem_dataset
fs, paths_or_selector = _ensure_single_source(source, filesystem)
File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 422, in _ensure_single_source
raise FileNotFoundError(path)
FileNotFoundError: nyctlc/green/puYear=2019/puMonth=*/*.parquet