Dataset support concurrency in one block when using map_batches

Hi, after using ray dataset in large-scale inference.
I find block number is the max parallelism in map_batches if we assume resource is unlimited.

The batch_size parameter in map_batches can only determine the solving batch size in one BlockWorker (the transform inner block is serial), which means can not accelerate the code through tuning batch_size.

I wish the max parallelism in map_batches is determined by batch_size. each batch can be solved in concurrent. Is this reasonable?

reference from:

Hey @Basasuya, can you share more about your use-case and if you’re running into some performance issues because of this?

If you have a few large blocks and as a result your parallelization is limited by the number of blocks as opposed to resources, one option is to repartition your dataset first so that you have more blocks that are each smaller.

I have hundreds of files, and each file would be large (serveral GB), my code is like below:

 dataset = ray.data.read_binary_files(paths=input_path).window(blocks_per_window=5)
.map_batches(parse_db_line, batch_size=None, compute=ActorPoolStrategy(5,5,1)
.map_batches(predictor, batch_size=None, compute=ActorPoolStrategy(5,5,1), num_gpus=1, num_cpus=1)
.write_json(output_path)

I think one solution is using repartition_each_window, which is the alltoall stage?
maybe we can support a onetoone stage repartition in the future which would run faster

@Basasuya As your understanding, the parallelism is determined by the number of blocks for a Dataset. There is no plan to support parallelization for batches within a block. However, there are a few things to call out:

  • In processing a batch, the execution is vectorized (by leveraging e.g. the compute kernel of Arrow), so there are parallelization at hardware level already
  • As mentioned above by @matthewdeng , you can use repartition() to increase the number of blocks. You’re right repartition() is an all-to-all operation, but if you don’t request for shuffle (by default it’s disabled) during repartition, it’s actually quite efficient, likely only involving block splitting (large block split into smaller ones)
  • If possible, you may shard your input files into smaller ones (e.g. if you are using Spark upstream, that should be doable to repartition before writing out to files)
  • If not possible, the good news is that we do have a plan to build a feature called Dynamic Block Splitting, which means you can produce multiple (small) blocks from a single input file as you read them into Dataset. It’s estimated to arrive in Ray 2.1 release, stay tuned!

@jianxiao @matthewdeng
thank you for reply
I think repartition_each_window is useful for me.
Looking forward to using Dynamic Block Splitting in future release :slight_smile: