Hive Partitioned Datasets

I posted this is the Ray datasets Slack channel last week, but just saw on there that it was recommended to post here for faster response.

I have a question related to how ray datasets will handle hive partitioned parquet folders. In the toy example I am working with right now, my folder looks like (4 years and 4 quarters):

--root/
----Year=1/
------Quarter=1/
--------file.parquet
------Quarter=2/
--------file.parquet
------Quarter=3/
--------file.parquet
------Quarter=4/
--------file.parquet
----Year=2/
----etc...

I can ingest this folder using ray_dataset = ray.data.read_parquet("root", schema=<file_schema>)
The resulting print-out of the dataset looks like this:

Dataset(
   num_blocks=16,
   num_rows=16,
   schema={Year: int64, Quarter: int64, Value: int64}
)

With 16 blocks for each combination of the folder partitions as I would expect. I have a few questions related to this:
If I call ray_dataset.map_batches(<some_func>, batch_size=None) , within each actor called for each batch, will ray only load the data needed for that particular batch? Like it would only load Year=1/Quarter=1/file.parquet for the batch that corresponds to that? I want to make sure that all the data doesn’t ever need to be coalesced into a single node for when this is put into practice.

Assuming that the partitioning of the underlying files has to be by year and quarter, I would like to figure out how to send a full year to each batch. Is there a way to do this without reading all of the data into a single node? I could do ray_dataset.groupby("Year").map_groups(<some_func>) but I see that the resulting DAG contains these two operations TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Sort] , does that imply all the data would need to be read by a single node for the sort operation?

If I do ray_dataset.repartition(4, shuffle=False).map_batches(<some_func>, batch_size=None) , the resulting blocks mix up the years between blocks, which I verified by returning the average year from each batch.
If I do ray_data.map_batches(<some_func>, batch_size=4) then this does appear to keep the years intact, but would that be a robust way to do this?

Thanks for the help!