Write custom data streamer

Hi,
I’m trying to write my own read_datasource method but I’m having an issue with Ray not leveraging all my CPUs. I work with a custom data format that I cannot share so for the sake of this thread, let’s say that this data format is made of a set of row groups. The problem that I am having is that these row groups can sometimes be large and I have no control over their size. I would like to be able to use several CPUs to read 1 row group when need be (I can define my own multi-threading read function). However, I have not managed to do so, I see that Ray uses at most 1 CPU at a time. Is there a way to do this?
Thanks

Hi @nicomng welcome to Ray community!

There are two ways to achieve parallelism of Ray Data loading:

  • Launching multiple read tasks, with each task handling a portion (i.e. shard) of the entire data collection to be loaded. Suppose your data is in filesystem, this is usually mapping one or more files to one task. You may create more files upstream so Ray Data can parallelize the loading and utilize more CPUs.
  • Within the same read task, you can implement multithreading read function and pass it to ReadTask.

By default each read task is using 1 CPU, so launching multiple tasks in parallel should utilize your CPUs. Also note you can change how many CPUs per task if that’s really what you want, via setting a ray remote args like

ds = ray.data.read_parquet(
    "example://parquet_images_mini",
    parallelism=2, # two files to read, so launching 2 tasks here
    ray_remote_args={"num_cpus": 3}, # each task needs 3 CPUs
)

If you want to learn more about building a custom data source, here is an example (for database, not filesystem, but idea is the same): Custom Datasources — Ray 3.0.0.dev0

Hi @jianxiao
Thanks for your quick reply. When trying your second solution, I’m getting this following error message:

Error: No available node types can fulfill resource request {‘num_cpus’: 3.0, ‘CPU’: 1.0}. Add suitable node types to this cluster to resolve this issue.

Not sure where the “CPU: 1.0” is coming from.

The error message basically means your cluster has no node that can satisfy this resource requirement. If that’s the case, it should report something like this (my machine has 8 CPUs):

import ray

ds = ray.data.read_parquet(
    "example://parquet_images_mini",
    parallelism=2, # two files to read, so launching 2 tasks here
    ray_remote_args={"num_cpus": 9}, # each task needs 9 CPUs
)

2022-11-02 17:44:06,225	INFO worker.py:1518 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Parquet Files Sample:   0%|          | 0/2 [00:00<?, ?it/s]
Dataset(num_blocks=2, num_rows=3, schema={image: ArrowTensorType(shape=(128, 128, 3), dtype=uint8), label: string})
Parquet Files Sample: 100%|██████████| 2/2 [00:00<00:00,  7.53it/s]
(scheduler +7s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(scheduler +7s) Error: No available node types can fulfill resource request {'CPU': 9.0}. Add suitable node types to this cluster to resolve this issue.
(scheduler +42s) Error: No available node types can fulfill resource request {'CPU': 9.0}. Add suitable node types to this cluster to resolve this issue.

I’m asking because my machine does have enough CPU. I am a bit surprised that the error message shows both “CPU” and “num_cpus”, considering my ray_remote_args is {"num_cpus": 3}. Isn’t this the source of the error?

My point is that when it’s erroring out, the message should pinpoint exactly what resource request wasn’t fulfilled; in the case posted in the above snippet code, it should just error out “CPU”.
Can you share you script you are running, suppose it’s not the same one I posted above? Also please provide the Ray version you were running. We can look into it with a repro.

Here is the snippet where I use a custom datasource reader:

ds = ray.data.read_datasource(RowGroupSampleReader(),
                              dataset=dataset,
                              columns=column_names,
                              ray_remote_args={"resources": {"num_cpus": 4}}
                             ).window(blocks_per_window=BLOCKS_PER_MEGABATCH
                             ).map_batches(dummy_process, batch_size=BATCH_SIZE, batch_format="pandas"
                             ).to_torch(feature_columns=FEATURE_COLUMNS, label_column=PRED_COLUMN, batch_size=BATCH_SIZE, prefetch_blocks=4)

I am running with ray 2.0.0.
Thanks!

Can you try change the ray_remote_args={"resources": {"num_cpus": 4}} to ray_remote_args={"num_cpus": 4}?

Thanks, that was the issue!