Here is the code snippet I am using to fetch the data from parquet with filters applied:
table_url = "<source table location>"
part = ds.partitioning(pa.schema([("date", pa.string())]), flavor="hive")
filter_expression = (ds.field("date") == '2021-05-14')
args = {}
args['dataset_kwargs'] = {'partitioning': part}
args['filter'] = filter_expression
ray_dataset = ray.data.read_parquet(table_url, **args)
rt = pa.concat_tables(ray.get(ray_dataset.to_arrow()))
I get the following error:
Traceback (most recent call last):
File "test_ray_ds_filters.py", line 20, in <module>
rt = pa.concat_tables(tables)
File "pyarrow/table.pxi", line 2271, in pyarrow.lib.concat_tables
File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Schema at index 2 was different:
column1: string
column2: string
date: string
vs
column1: string
column2: string
date: null
“date” is partition key and it is coming as of null type for the tables which are empty due to filter condition applied.
We can avoid this by either 1) filtering out the empty tables or 2) sending promote=True
But I am not able to handle this when error is thrown by ParquetDataset classes internal concat table call : ray/parquet_datasource.py at master · ray-project/ray · GitHub
This happens when I try to split the datasets into shards and and try to convert them into pyarrow tables.
To handle this internally, we can check if the piece.to_table call returns a table with num_rows > 0 and if this condition is met then only we add the partition keys