How to run map_batches function in the same order as the blocks in the block_list

Hello. I have a dataset with 100 blocks for instance. I need to use map_baches method to process this dataset, and I set cpu_number to be 10.
The blocks will be processed with parallel-number 10.
How can I run map_batches function strictly in the same order as the blocks in the block_list.
The following diagram shows the logic I need.

Thanks.

Hi @veryhannibal , for the use case you describe above, is it a viable alternative to use Dataset.groupby and Dataset.map_groups?

Thanks for your feedback. So it will run in the same order as the groups in the grouped dataset?

@veryhannibal Can you provide some context for us to understand the use case? In general, users don’t make assumptions about blocks, which are internal for the Dataset. The map_batches() API is exposing the batch_size for configuring inputs to the UDF.

OK, thanks for your feedback. :grin: Let me demonstrate it.
My project is about vertical federated learning.
For instance, 2 parties run a federation job, I have a dataset with 120 samples, but Feature 1 - 5 are owned by party-1, feature 5 - 10 and label are owned by party-2.
These 100 samples are indexed from 1 - 120.

The following image shows how my distributed program runs. The problem is party-1 and party-2 split these 120 sample into 12 blocks, but they can only run 3 blocks in parallel each time, How can I
make sure they both run strictly in the same order as blocks shown above?
I set the batch_size to None in my program, so it runs in parallel by blocks.

1 Like

@veryhannibal THANK YOU for providing context and putting them in such an intuitive way to understand! :slight_smile:

Based on the use case, I think you can:

  • use zip() to combine these two datasets into one, like: combined_ds = party1_ds.zip(party2_ds).
  • launch federated task via combined_ds.map_batches(federated_udf) which will receive all the features as well as the label in federated_udf.

This way you will be able to process each row with all features, and you can launch the federated_udf with parallelism = 12, instead of 3.

Thanks for your advice.
I left something unclear, that is party-1 and party-2 are two independent ray-clusters which are cross-domain. So the federated task between two blocks from different parties depends on communication. And because of data privacy they are not allowed to send original data to each other, they can only send some encrypted mid-values(for instance mean, sum, count …) to the peer side, so I cannot directly zip two dataset.
And In a real scene, the dataset will be huge, the block number may be thousands, but the parallelism relies on cpu number. So I cannot let these thousands of blocks run in the same time. That is why I have the problem.

I left something unclear, that is party-1 and party-2 are two independent ray-clusters which are cross-domain. So the federated task between two blocks from different parties depends on communication.

In this case, you cannot zip() them. One way you can do is use ds.get_internal_block_refs() to get all blocks and then coordinate the pair-wise computation. Note this is not a usual pattern to use Ray Data (the data cannot move across clusters with object store).

And In a real scene, the dataset will be huge, the block number may be thousands, but the parallelism relies on cpu number. So I cannot let these thousands of blocks run in the same time. That is why I have the problem.

It’s totally your choice. What I was saying is that you are enabled to go higher parallelism when you don’t have to process them sequentially.

Ok. Thank you very much for your help. So far I have found a solution that, although not optimal, can temporarily solve my problem. I split the original dataset into several smaller datasets, and each of them have less blocks, and then sequentially run these datasets in parallel by blocks.
If I have better solution, I will share it here.
Very happy to be here to discuss technology with you. :grin: :grin: :grin:

Yeah, that’ll work, the ds.split(N) internally will group blocks (and if equal splitting is required, it will break down single block into multiple), so you can operate on those N sub datasets the way you want.

You’re welcome! Feel free to close this thread if your question is addressed :slight_smile: