How severe does this issue affect your experience of using Ray?
- Low: It annoys or frustrates me for a moment.
I have a large 900m row data set that is taking a long time to process and is getting lots of disk spilling. I am wondering what the recommended steps / things I should do for optimal processing?
Here is what I am doing, I am:
- Getting my data and repartitioning it based my # of cpu cores
- Getting an count of my target variable for processing
- Mapping / recoding my target variable based on counts using map_batches and an ActorPoolStrategy that can feasbily use all my cores
- Splitting the data into train and test sets
- OneHotEncoding
- Shuffling
- Creating RayDMatrix for XGBoost
The thing that jumps out to me in the logs is that, after running:
ds = ray.data.read_parquet(aml_context.input_datasets['RandomizedTest'],
use_threads=True,
columns=feature_selection,
parallelism=num_partitions,
).repartition(num_partitions, shuffle=False)
The DAG in the log prints this:
Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet->SplitBlocks(13)] -> AllToAllOperator[Repartition]
Ok, the next operation
out = ds.select_columns(['y']).groupby('y').count().to_pandas()
I get this DAG:
Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet->SplitBlocks(13)] -> AllToAllOperator[Repartition] -> AllToAllOperator[Aggregate]
Is Ray doing the same steps all over again? Do I need to .materialize()
periodically or what is the recommended approach for handling very large data sets?