Ray data creating multiple datasets and repeating map operations on ray dashboard

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am using ray data to create a dataset for further model training with Ray train with a Tensorflow trainer.

The transformations that I am doing using Ray are:

  • ray.data.read_binary_files (loading Json files compressed with Gzip)
  • map function to parse the binary files and create numpy matrixes
  • filter function to remove numpy matrixes that contain errors (ex: None, NaN, Inf values)
  • save the transformed dataset using write_parquet method

Before I save to Parquet I have the count method to check how many files I end up with because the filter operation will remove some rows.

When I submit the job to the ray cluster and evaluate its performance using the Ray dashboard I noticed that until the count function I see the execution of the ReadBinary->Map->Filter on the Ray Core tab and one dataset in the Ray dataset tab. Therefore the number of records is shown. When it reaches the writing stage to parquet, I see a new dataset execution in the Ray dataset tab, and news tasks on Ray Core: ReadBinary->Map->Filter. Is this behavior correct? I was expecting to see a single execution of the task ReadBinary->Map->Filter until the count function and therefore write to parquet without new releases of these operations. I also opened the produced parquet file to count how many rows were there and the number is different from the count method. So why is not processing all the dataset until the count function?

I hope someone can clarify this to me.

Thanks,
Joao

@jfecunha

Hi Joao,

Take a look at the Note in this API reference: ray.data.Dataset.count — Ray 2.37.0

Your data pipeline has a map and a filter operation after the read, which means that the total number of rows could be any number, and there’s no way to figure that out without executing the operations.

If you call count() before write_parquet, the dataset execution will trigger 2x from the beginning. The call to count will execute the dataset to calculate the number of rows, but will throw away all of the intermediate work. Then, the write_parquet will happen independently afterwards.

If you only want to execute the dataset once, you can cache the dataset in memory, then perform whatever downstream tasks on the cached dataset (like getting the count or writing to files):

ds = ray.data.read_binary_files(...).map_batches(...).filter(...)

ds = ds.materialize()
row_count = ds.count()  # does not re-trigger execution
ds.write_parquet(...)  # does not re-trigger execution

See ray.data.Dataset.materialize — Ray 2.37.0