Pyarrow.concat_tables with ray.data.read_parquet gives schema mismatch error when filters are applied

Here is the code snippet I am using to fetch the data from parquet with filters applied:

table_url = "<source table location>"
part = ds.partitioning(pa.schema([("date", pa.string())]), flavor="hive")
filter_expression = (ds.field("date") == '2021-05-14')
args = {}
args['dataset_kwargs'] = {'partitioning': part}
args['filter'] = filter_expression
ray_dataset = ray.data.read_parquet(table_url, **args)
rt  = pa.concat_tables(ray.get(ray_dataset.to_arrow()))

I get the following error:

Traceback (most recent call last):
  File "test_ray_ds_filters.py", line 20, in <module>
    rt  = pa.concat_tables(tables)
  File "pyarrow/table.pxi", line 2271, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Schema at index 2 was different: 
column1: string
column2: string
date: string
vs
column1: string
column2: string
date: null

“date” is partition key and it is coming as of null type for the tables which are empty due to filter condition applied.

We can avoid this by either 1) filtering out the empty tables or 2) sending promote=True

But I am not able to handle this when error is thrown by ParquetDataset classes internal concat table call : ray/parquet_datasource.py at master · ray-project/ray · GitHub

This happens when I try to split the datasets into shards and and try to convert them into pyarrow tables.

To handle this internally, we can check if the piece.to_table call returns a table with num_rows > 0 and if this condition is met then only we add the partition keys

Hey, this looks like a bug in the library. Do you mind filing a github issue for this with an example parquet file?

Thanks for the followup Alex! Yes I have opened an issue : #17926