Recommendational steps for processing big data?

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

I have a large 900m row data set that is taking a long time to process and is getting lots of disk spilling. I am wondering what the recommended steps / things I should do for optimal processing?

Here is what I am doing, I am:

  1. Getting my data and repartitioning it based my # of cpu cores
  2. Getting an count of my target variable for processing
  3. Mapping / recoding my target variable based on counts using map_batches and an ActorPoolStrategy that can feasbily use all my cores
  4. Splitting the data into train and test sets
  5. OneHotEncoding
  6. Shuffling
  7. Creating RayDMatrix for XGBoost

The thing that jumps out to me in the logs is that, after running:

ds =['RandomizedTest'],
                            ).repartition(num_partitions, shuffle=False)

The DAG in the log prints this:

Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet->SplitBlocks(13)] -> AllToAllOperator[Repartition]

Ok, the next operation

out = ds.select_columns(['y']).groupby('y').count().to_pandas()

I get this DAG:

Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet->SplitBlocks(13)] -> AllToAllOperator[Repartition] -> AllToAllOperator[Aggregate]

Is Ray doing the same steps all over again? Do I need to .materialize() periodically or what is the recommended approach for handling very large data sets?

Hi @localh,

The DAG prints out the DAG of operators (or what we call it as physcial plan). Most of Ray Data APIs are lazy. They only get executed when until a consumption operation such as Dataset.iter_batches() or Dataset.materialize().

You normally do not need to call materialize(), unless you want to cache that dataset in memory to avoid execution multiple times. More info in Scheduling, Execution, and Memory Management — Ray 2.5.1 .