Best practices around handling giant datasets with ray data (large amount of read tasks)

We’ve implemented a custom datasource that reads from an index in postgres and downloads image files from s3.

We are working with a 300+ million scale dataset. The datasource currently reports the correct dataset size for the entire dataset. Thus, Ray decides to split it into many read tasks (1 million+).

This causes a huge delay (30 minutes +) during startup (likely due to pickling the read tasks themselves). We did everything to make the overall pickled read_fn as tiny as possible (_open_block is a top-level function within the module):

    def _create_read_fn(self, offset: int, limit: int):
        table_name = self.table_name

        def read_fn():
            yield _open_block(table_name, offset, limit)

        return read_fn

Is there a more efficient way to handle this?

Hey @eric, what happens if you configure override_num_blocks and set it to a (relatively) low value?

It spills to disk and eventually runs out of disk space

@bveeramani is this to be expected? From my understanding ray read tasks are always executed fully?

No, this isn’t expected.

What version of Ray are you using?

From my understanding ray read tasks are always executed fully?

Not quite. The Ray Data scheduler should only pull data from a read task if there’s space available in the object store. So, if a read task yields 128 MiB batches and the object store is almost full, it shouldn’t continue executing the task until space is freed.

Ahh good to know. Seems like fundamentally [core] Correctly report size of variable shaped TensorArray by BitPhinix · Pull Request #47743 · ray-project/ray · GitHub was the issue