- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have list of parquet files which occupies about 8GB in memory when loaded with pandas on a single machine. I am trying to load these on to a ray cluster to create a modin dataframe from the ray dataset.
Ray cluster settings : workers=6, cpu=3, ram=9, head_cpu=0, head_ram=2, conda_env='conda_env_temp', head_type='local', head_env={}, worker_env={}
I am able to create a ray dataset using the following function.
ds = ray.data.read_parquet( )
But the cluster throws OOM error when in try to create a modin dataframe from the above ray dataset using the modin_frame = ds.to_modin()
.
The reason for creating a modin dataframe is to leverage the pandas API for some feature transformation task.
I do understand that modin dataframe is created by combining distributed pandas dataframes. In that case, what are the optimal node memory, CPU and number of nodes settings so that I am able to avoid OOM error?
1 Like
Hi @krishnavyas, thank you for posting this question!
It should be noted that ray.data.read_parquet()
is semi-lazy; it will only read the first Parquet file eagerly, while the reading of the rest of the Parquet files is deferred until they are needed. ds.to_modin()
triggers the reading of the rest of the files as Arrow Tables, converts each Arrow Table block to a Pandas DataFrame, and then hands those over to Modin.
Another thing to note is that, for a Ray cluster with 9 GB of total memory, 30% of that memory is going to be allocated to Ray’s shared memory object store, so you will have ~3 GB available for the shared memory object store, and ~6 GB available for the worker heap. What I’m guessing is happening is that the data fits in Ray’s shared memory object store right after reading from disk since it’s in a more compact Arrow Table format (or the object store data is being transparently spilled to and restored from disk), but when ds.to_modin()
converts those object store Arrow Tables to worker heap Pandas DataFrames, all ~6 GB of the available worker heap is exhausted and the cluster OOMs.
Some steps to determine how large your data is in memory:
- Check to see how large a single file is in memory right after reading (in Arrow Table format):
ray.data.read_parquet(files[0]).fully_executed().size_bytes()
- Check to see how large all files are in memory right after reading:
ray.data.read_parquet(files).fully_executed().size_bytes()
- Check to see how large a single file is in memory after converting it to a Pandas DataFrame:
ray.data.read_parquet(files[0]).map_batches(lambda df: df, batch_size=None, batch_format="pandas").size_bytes()
or pd.read_parquet(files[0]).memory_usage(deep=True)
- Check to see how large all files are in memory after converting to a Pandas DataFrame, by doing (3) for one file at a time.
With this data in hand, you’ll need to ensure that your object store and worker heap are both large enough to hold the Pandas DataFrame representation of your dataset; assuming that at least one of your Modin transformations doesn’t reduce the size of your data (e.g. at least one transformation is not an aggregation), you’ll need room in the object store for both input DataFrames and output DataFrames, so you’ll actually need an object store that’s at least 2-3 times the size of your dataset in the DataFrame representation.
In your case, if your dataset is 8 GB in the DataFrame representation, then you’ll need at least 16 GB of object store to be safe, which at a 30% allocation would mean you’d need ~50 GB of aggregate RAM in your Ray cluster.
1 Like