Interleaving file reads with custom datasource

I have a dataset of ~100k files with a custom format and each file contains a variable number of rows. I’ve implemented a custom datasource where each read task gets a list of N files, opens each file sequentially, and yields a DataFrame with a single row for each example in the file. I load this datasource using dataset.read_datasource and then I iterate over it using iter_batches. The problem I’m seeing is that every file contains highly correlated data so it’s important that each batch element comes from a unique file, but I haven’t found a way to achieve this within ray. My current solution is to post process the output batches from ray and cache the rows until I have enough examples from unique files to yield a batch with rows from unique files, but I’m wondering if there’s a more memory efficient way to do this. I’m basically looking for the equivalent of tf.data.Dataset.interleave.