Writing one file for each block

For checkpointing purposes (e.g., if the Ray map_batches job is interrupted while running), we want to write each block to a single file with a deterministic name. This allows us to have logic in our mapping operation to pull the data from a checkpoint instead of re-computing it.

Previously, we could use {block_index}.{file_format} in a custom block path provider. However, as of Ray 2.7.0, it seems like the block index stays the same while the task_index varies.

How can we accomplish the goal of writing each block to a single corresponding file, and what is the new correspondence of block/task idx to the blocks within the dataset?

It seems like this PR is related: https://github.com/ray-project/ray/pull/37986 @Stephanie_Wang

Hey @spolcyn,

This allows us to have logic in our mapping operation to pull the data from a checkpoint instead of re-computing it.

Could you show me what this looks like? I’m not sure if writing blocks to deterministic files is the best way to achieve this.

Sure, here it is in pseudo code:

def run_job(large_dataset: str, output_path: str):
  dataset = read_parquet(large_dataset)
  dataset = dataset.map_batches(expensive_fn_1)
  dataset = dataset.map_batches(expensive_fn_2)
  dataset.write_parquet(output_path, block_path_provider=provider())

The goal is, if I run this function twice or more (e.g., because the driver for this process failed or a worker failed too many times, possibly due to something like spot interruptions on the compute nodes), it should skip any batches that have already been processed during the previous run.

The checkpointing logic then either needs to be done at a Dataset API level, or in expensive_fn_1 or expensive_fn_2 so it can return cached results instead of recomputing them.

This sounds simple but would {task_index}.{block_index}.{file_format} work? I believe the new scheme is that we have N write tasks, and each task can write K blocks. This allows write tasks to bound their memory usage.

@ericl Yep, that would work and is the format I’m currently using.

What I can’t figure out from the source though is exactly how task index & block index correspond to parameters that I set, which means I can’t be confident that the output is reproducible/reliable.

When I tweak the number of blocks in the dataset / number of CPUs available to Ray (at least in local tests using ray.init()), I always get the same output: block_index is 0, and task_index goes from [0, num_blocks].

In the log output (for this particular script setup), I do see 2 different PIDs for the Write task in the logs.

Here’s an example script:

import os
from typing import Optional

import ray
from pyarrow.fs import FileSystem
from ray.data.datasource import BlockWritePathProvider


class TestBlockWritePathProvider(BlockWritePathProvider):
    """Ray class that allows us to control the file names of data blocks
    written out by Ray Data"""

    def _get_write_path_for_block(
        base_path: str,
        filesystem: Optional[FileSystem] = None,
        dataset_uuid: Optional[str] = None,
        task_index: Optional[int] = None,
        block_index: Optional[int] = None,
        file_format: Optional[str] = None,
    ) -> str:
        print("block task", block_index, task_index)
        suffix = f"{block_index}_{task_index}.{file_format}"
        return os.path.join(base_path, suffix)

dataset = ray.data.range(10000)
dataset = dataset.repartition(1000)
dataset.write_parquet("./test_out", block_path_provider=TestBlockWritePathProvider())

After a repartition(), the number of tasks is guaranteed to be the number of blocks, and the blocks per write task should always be 1.

The only situations where you’d see more than one block per write task, is if the write operation is fused with prior operations such as read/map, which can emit multiple blocks per task.

Great, I think that gives a clear path forward then. Thanks for clarifying!

1 Like