Ray.data.read_parquet example on azure blob storage not working

Running into issues using ray.data.read_parquet() against azure blob storage.

Cluster:

  • Tested against 2.21.0 (via kuberay on aks)
  • pyarrow - 12.0.1
  • adlfs 2024.4.1
  • fsspec 2024.3.1

For me running the example code to load a parquet file from Azure Blob Storage link doesn’t work.

Executing from the head node

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/read_api.py", line 752, in read_parquet
    datasource = ParquetDatasource(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 246, in __init__
    pq_ds = pq.ParquetDataset(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1776, in __new__
    return _ParquetDatasetV2(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2465, in __init__
    finfo = filesystem.get_file_info(path_or_paths)
  File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1490, in pyarrow._fs._cb_get_file_info
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/fs.py", line 332, in get_file_info
    info = self.fs.info(path)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/home/ray/anaconda3/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
  File "/home/ray/anaconda3/lib/python3.10/site-packages/adlfs/spec.py", line 623, in _info
    props = await bc.get_blob_properties(version_id=version_id)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/tracing/decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 783, in get_blob_properties
    process_storage_error(error)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/_shared/response_handlers.py", line 182, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
  File "<string>", line 1, in <module>
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 773, in get_blob_properties
    blob_props = await self._client.blob.get_properties(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/tracing/decorator_async.py", line 77, in wrapper_use_tracer
    return await func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py", line 487, in get_properties
    map_error(status_code=response.status_code, response=response, error_map=error_map)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/azure/core/exceptions.py", line 164, in map_error
    raise error
azure.core.exceptions.ClientAuthenticationError: Operation returned an invalid status 'Server failed to authenticate the request. Please refer to the information in the www-authenticate header.'
ErrorCode:NoAuthenticationInformation

Code:

import adlfs

ds = ray.data.read_parquet(
    "az://ray-example-data/iris.parquet",
    filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)

print(ds.schema())

I tested this out against a different dataset that I know to be working.

This works just fine using dask:

import adlfs
import dask.dataframe as dd

storage_options = {'account_name': 'azureopendatastorage'}
ddf = dd.read_parquet('az://nyctlc/green/puYear=2019/puMonth=*/*.parquet', storage_options=storage_options)
ddf.head()

Here is the same example using Ray:

code

import adlfs

ds = ray.data.read_parquet(
    "az://nyctlc/green/puYear=2019/puMonth=*/*.parquet",
    filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)

output:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/read_api.py", line 752, in read_parquet
    datasource = ParquetDatasource(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 253, in __init__
    _handle_read_os_error(e, paths)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/file_meta_provider.py", line 400, in _handle_read_os_error
    raise error
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/datasource/parquet_datasource.py", line 246, in __init__
    pq_ds = pq.ParquetDataset(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1776, in __new__
    return _ParquetDatasetV2(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2490, in __init__
    self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 763, in dataset
    return _filesystem_dataset(source, **kwargs)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 446, in _filesystem_dataset
    fs, paths_or_selector = _ensure_single_source(source, filesystem)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pyarrow/dataset.py", line 422, in _ensure_single_source
    raise FileNotFoundError(path)
FileNotFoundError: nyctlc/green/puYear=2019/puMonth=*/*.parquet