How to increase parallelism for dataset.count()?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I suspect I just don’t understand how to use Ray correctly here.
I have a dataset:

items = ray.data.read_datasource(
    TarDatasource(extra_tar_flags="--strip-components 2", profile=True),
    paths=S3_SOURCE_URL,
    filesystem=fs,
    include_paths=True,
)

total_items = items.count()

Right now, items.count() takes a long time because the dataset is large, and I’m downloading / processing all the tar files on a single computer.

I’d like to know how to spawn, for example, 50 worker nodes – so the dataset can be processed more quickly. I’m reasonably sure I can use an actor to do this (I assume if I create 50 actors, ray will spawn 50 nodes; if each node has 1 CPU), but I was wondering if the idiomatic solution is to really create a class.

I tried setting parallelism to 50, and that didn’t change anything.

So the dataset.count() appears slow because it’s lazily executed that included time to load data. To isolate how much time it takes to read data you can time

dataset = ray.data.read_datasource(path).fully_executed()

And expect .count() to be instant.

For increasing IO throughput, increasing parallelism helps to add more reader tasks but it also has upper limit on # of files you have. We do not spawn multiple read tasks to read the same file.

To verify you can print(items) to see how many blocks your dataset has, or read the tqdm bar while you’re reading data to see what the number is.

One trick that I use to improve my dataset reading speed for benchmarking workloads, assuming you’re dealing with the same datasource

  1. Read data source with default options to ds (can be slow)
  2. call ds.repartition(n) to change # of blocks that 1-1 map to output file
  3. call ds.write_parquet to local disk or remote storage

Some rule of thumb:

  • Reading reasonably large file (10MB ~ 1GB) is preferred over large # of small files (a few KB/each)
  • Increasing parallelism param helps the most when your # of reader tasks is less than total # of files
  • Parquet file is generally encouraged for its better read performance and compression ratio

Thanks for the detailed response! The primary thing I’m wondering how I can get Ray to automatically spawn new worker nodes.

I.e, since the files are in S3. Ideally, I would just launch 1 machine for every group of 10 files.
I’m using a Ray cluster, and I can set min_workers to 50 to spawn 50 worker nodes. But, I can’t seem to spawn new worker nodes on the fly, which is what I want to do.

Ah, that’s more of the autoscaler territory. With min_workers=50 what we would do is read # of CPUs you have to decide parallelism parameter based on current status. I don’t think we have the mechanism to spawn new worker nodes on the fly tho, it only happens when you tried to schedule more compute resource than what your ray cluster currently has, but at that time our parallelism parameter is already determined.

From reading this blog post 0, it looks like I can use actors in order to spawn new machines. Specifically, I’d do something like max_workers=100, and then spawn a lot of actors, each of which would process a subset of the dataset.

Does that seem right?

That’s a different thing though. Spawning actors doesn’t mean spawning new machines and doesn’t need autoscaler. They are just processes.

Also note that reading data is using Ray tasks, not actors, so it is not relevant anyway.