Read parquet files with wildcard/glob similar to Dask DataFrame?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I’m experimenting with moving some workloads from Dask to Ray. Rather than run the dask_on_ray helper, I’d like to learn how to do things natively in Ray. To start, I’m attempting to load a large collection of parquet files from GCS. In Dask we can do this with multiple wildcards in the path name, and it distributes the various parquet files across the cluster:

from dask.distributed import Client
import dask.dataframe as dd

client = Client("localhost:8786")

df = dd.read_parquet("gcs://bucket_name/prefix*/2023-03-01*.parquet")
df.head()

However, when using the same path with Ray, it complains about not being able to locate the Filename:

import ray

ray.init()
df = ray.data.read_parquet("gcs://bucket_name/prefix*/2023-03-01*.parquet")

FileNotFoundError: gcs://bucket_name/prefix*/2023-03-01*.parquet

If I reduce this to only load a single folder (without wildcards) within the bucket, it’ll scale and load in Ray, but it removes the ability to filter out files within the folder; or merge multiple datasets with a single call. This seems to be documented here:
https://docs.ray.io/en/latest/data/creating-datasets.html#reading-files-from-storage

import ray

ray.init()
df = ray.data.read_parquet("gcs://bucket_name/prefix-full-folder-name/")

Is there a way to load files from storage using wildcards? Or what would be best practice be for loading data for a particular time range from a structure like this:

bucket_name:
  prefix-folder-one:
   2023-03-01.parquet
   2023-03-02.parquet
   2023-03-03.parquet
  prefix-folder-two:
   2023-03-01.parquet
   2023-03-02.parquet
   2023-03-03.parquet

cc @chengsu to look at the question! I will move the question to the data channel.

1 Like

Creating the list before calling Ray does the trick, but adds a step when compared to Dask

import ray

# Create a GCS filesystem object
fs = gcsfs.GCSFileSystem()

# List all objects that match a wildcard string
files = fs.glob('gcs://bucket_name/prefix*/2023-03-01*.parquet')
files = list(map(lambda _: f"gcs://{_}", files))

ray.init()
df = ray.data.read_parquet(files)

cc @bveeramani @chengsu (seems like a data bug).

I don’t think wildcard matching is a supported feature. @Scott_Zelenka could you file a feature request on GitHub?