Ray.data.filter() much slower than without filter

Trying to filter on partitioned parquet files (700 files) of 70M rows and 35 columns (numeric) but it becomes much slower than without filtering on a machine with 8 cpus:

## without filter
%%time
import ray
dt = ray.data.read_parquet('df_data/date_key=2024-02-29')
dt.count() 

which takes about 12 minutes which is fine.

CPU times: user 38.4 s, sys: 4.53 s, total: 43 s
Wall time: 12min 38s

However, it will become much slower if I do a filtering:

%%time
dt.filter(lambda x: False if pd.isnull(x['account_created_date']) else (pendulum.parse(x['date_key']) - pendulum.parse(x['account_created_date'])).days <= 30, concurrency=12, num_cpus=0.2).count()

It takes hours without any sign of finishing. It is also extremely slow when I’m trying to do a limit after filter (dt.filter(*).limit(3).show()). Any solutions for this?

The reason for the time difference between the two is because for purely Parquet read Datasets (i.e. just read_parquet()), this does not trigger execution of the full dataset, and gets the row count from the file metadata. In comparison, read_parquet().filter() will require execution of the dataset in order to get row count, as it is no longer possible to get the row count purely from file metadata.

On the other hand, I would expect that dt.filter(*).limit(3).show() should run relatively fast, since this would only require executing the dataset just enough to get 3 rows. Do you observe the same slowness if you omit the filter (i.e. just dt.limit(3).show())?

I was also expecting dt.filter(*).limit(3).show()to be fast but it’s also taking hours not returning the output. It seems ray==2.34.0 is not aware of the limit of 3 while running filter() . Yesdt.limit(3).show() is fast only takes a few seconds.

Thanks for reporting, I have opened a new GH issue here: [Data] `Limit` operation does not early-stop dataset execution · Issue #47287 · ray-project/ray · GitHub

Thanks @sjl , I did rerun the dt.filter(*).limit(3).show() in a clean environment and it is able to finish in a few minutes. Last time it was taking hours because all cpus were occupied by the other ray job. So I think ray is still doing optimization when doing a limit after filter but just not as fast as expected.

Ah got it, so the original issue seems to be resolved?

I think the difference in time that I observed in the reproducible example in my GH issue is due to the metadata fetching time (one vs all files). I’ll go ahead and close off the issue for now, please feel free to follow up or re-open if you run into it again. Thanks!

The original issue is still there:dt.filter(*).count() is taking forever to run even dt.count() takes 12 minutes to return the count.