Benchmarks for Ray Data?

Similar to my other question regarding benchmarks for Ray Serve, are there benchmarks that have either been published or in the works for Ray Data in comparison to TF Transform, Dataflow, or other preprocessing solutions?

Hi @jinnovation great question and right time. Benchmarking is one of key focus area of Ray AIR w/ Ray Dataset in Ray 2.1 & 2.2 that we’re actively testing and improving on weekly basis.

You should hear from us soon :slight_smile:

3 Likes

Hi @Jiao_Dong, is there any benchmark result of Ray Dataset now?

Hi @jinnovation @loneystar1983 thanks for your interests in Ray Data!

We are actively working on this. For now, we have some results from the two most recent Ray Enhancement Proposals:

2 Likes

Thanks @zhz ! These proposals are greatly helpful, I can preprocess my data more fastly. And more, is there a comparision between Ray Data and Spark DataFrame?

Some benchmark available here! Developer Preview: Ray Data Streaming Execution - Google Docs

We are working on a benchmark study that involves Spark DataFrames and will share the results in 3~5 weeks

@jinnovation Stay tuned. We will be publishing these benchmarks as blogs in the coming weeks. Can you close this issue since we are in the process sharing our findings with the community?

Hi @jinnovation please checkout this blog: Fast, flexible, and scalable data loading for ML training with Ray Data

Here is my case.
I have 200 image files and the image in each file is of big size.
I implement a datasource to read those files with a parallelism number following the ray.io doc.
When I try to ingest this dataset to a prepocessor and then the training worker in a streaming way , my object store is out of memory from time to time. The trainning worker consums the output of the preprocessor very slow but I can’t control the datasource loading speed. As the buffer, the object store oveflows.

Is there any way to cascade the flow control to the data source? That is, suppose the buffer size is fixed , when the data sink is slow, just slow down the data incoming.

Any help will be great.

Hi @Li_Bin , what version of ray are you using?
We recommend directly using ray.data.read_images to read the image files. With Ray 2.7, you get both streaming execution and auto parallelism detection, which should help address both of the concerns you mention above.

Let me know if this is helpful, or if you have any further questions.

I am using 2.7.
My image files have specific format other than those supported by read_images. I implement my own datasource with yield.

Let me ask this question from other way: Suppose my dataset sink ,say, the traininer , consumes bolcks very very very slow, what will happen? The image reading from disk will be blocked and not accept more new data into the pipeline or the new data continues to come into the heap/object store memory to buffer?

Got it, thanks for rephrasing the question, I think I understand what you are getting at.

In the above scenario, as you observed, Ray Data will launch multiple read tasks to load data into the pipeline, even if the trainer only consumes 1 image at a time in a very slow manner. This is likely the reason you are seeing the object store OOM. Ray Data has some backpressure logic which kicks in to limit tasks which are not needed, but it’s likely that the large file size you mention is causing an issue here. We are working to address this bug in a future release, likely by the end of the year.

In the meantime, a potential workaround would be to increase the number of CPUs for read tasks, so that it limits the number of read tasks scheduled. For example:

ray.data.read_datasource(..., ray_remote_args={"num_cpus": 5},)

Let me know if this is helpful @Li_Bin

Thanks for your workaround . I will try .

hi,
To reprocude the issues I met, I work out a exmaple as below :
https://colab.research.google.com/drive/1fTbeuCIN1Ojhok46Qu7SMhpNtiGW7taM?usp=sharing

From the example, I have serveral questions :

1). When I set the PARALLELISM= -1, suppose I have 41 files and the parallelism determined by the system is 200. System says: “To satisfy the requested parallelism of 200, each read task output is split into 5 smaller blocks.” . My quesion is how to split the single bolck which actually is a single file to 5 smaller blocks?
2) No matter what number I set the PARALLELISM , the dataset.train_test_split function will load all the images into memory and not a lazy execution?
3). For the scenario where the resource is very limited, for example the colab with 2 CPUs, 1GPU, small RAM, the training will be OOM anyway. The workaroud given seems not to work When I set the ray_remote_args={“num_cpus”: 2} .
4) One more further question is : Suppose I will finally feed ,say, 10G data into one GPU, How can I estimate the memory size used in heap and in object store at each stage from the streaming pipeline backwards (suppose I have several preprocessor maps) ?

Thanks a lot.