My use case is following:
- Load dataset where each file is a block
- Based on contents of each block, create a new dataset where a new block may contain one or many old blocks (the grouping strategy for new blocks is determined by data contained in old blocks), hence new blocks may have overlapping data (which is ok to me)
- Produce final dataset by applying
custom_functionto each new block in new dataset.
Re 1: All clear, use custom datasource
Re 2: Here the plan is to use
map_batches to gather data needed for repartitioning from input dataset, use this data to group existing block object refs, merge blocks based on grouping strategy derived above and create new dataset from these blocks.
Re 3: Similar to 1, use map_batches.
The questions are mostly regarding behavior of map_batches and specific DatasetAPI to use in 2, specifically:
If I use
batch_size, is it guaranteed that batch == block? If not, what do I do to make sure that blocks are kept intact and no repartitioning is taking place? Do I have to write custom task for this and apply it to each
How do I merge two blocks given two
ObjectRef[Block]refs? Is there some efficient way without explicitly merging
Blockobjects and creating new ref to this object, so I can avoid copying data? Or is there a way to tell Ray to handle two
ObjectRef[Block]refs as a single ref? How do I create a dataset solely from a list of existing
ObjectRefrefs? For clarity, my blocks are pandas dataframes.