My use case is following:
- Load dataset where each file is a block
- Based on contents of each block, create a new dataset where a new block may contain one or many old blocks (the grouping strategy for new blocks is determined by data contained in old blocks), hence new blocks may have overlapping data (which is ok to me)
- Produce final dataset by applying
custom_function
to each new block in new dataset.
Re 1: All clear, use custom datasource
Re 2: Here the plan is to use map_batches
to gather data needed for repartitioning from input dataset, use this data to group existing block object refs, merge blocks based on grouping strategy derived above and create new dataset from these blocks.
Re 3: Similar to 1, use map_batches.
The questions are mostly regarding behavior of map_batches and specific DatasetAPI to use in 2, specifically:
-
If I use
map_batches
without specifyingbatch_size
, is it guaranteed that batch == block? If not, what do I do to make sure that blocks are kept intact and no repartitioning is taking place? Do I have to write custom task for this and apply it to eachObjectRef[Block]
in dataset? -
How do I merge two blocks given two
ObjectRef[Block]
refs? Is there some efficient way without explicitly mergingBlock
objects and creating new ref to this object, so I can avoid copying data? Or is there a way to tell Ray to handle twoObjectRef[Block]
refs as a single ref? How do I create a dataset solely from a list of existingObjectRef
refs? For clarity, my blocks are pandas dataframes.
Thanks!