Hi!
I am training a model using Ray’s distributed XGBoost implementation.
For the last couple of days I’ve been testing it on a different size datasets. What I observed is that following the official tutorial that suggests loading the data as a Dataset
and then accessing and materializing it shards in workers [link1] [link2] is much slower then directly loading chunks of data in workers, and consuming much more memory (because of the object store) leading to a need for more workers.
For instance, loading a Parquet dataset consisting of 15M rows and ~1.5k columns from GCS takes around 1 minute when doing reading in workers, and crashes (due to out of disk space errors, most likely because of the object spilling) when using Dataset
after 11 minutes.
I understand that Ray Data does much more than just fetching and loading the data into memory, but I don’t understand why the difference is so huge.
Here is the worker function I used to debug that loads only the file paths from the Dataset
, but does the reading locally:
def worker_from_paths():
def get_dataset_shard(dataset_key: str) -> pd.DataFrame:
ds_files_iter = ray.train.get_dataset_shard(dataset_key)
ds_files = list(ds_files_iter.materialize().to_pandas()["item"])
df = pd.concat((pd.read_parquet(f) for f in ds_files))
return df
df = get_dataset_shard("train")
train.report({"size": len(df)})
And the direct one, taken from the XGBoost Ray implementation:
def worker_from_direct():
def get_dataset_shard(dataset_key: str) -> pd.DataFrame:
ds_files_iter = ray.train.get_dataset_shard(dataset_key)
df = ds_files_iter.materialize().to_pandas()
return df
df = get_dataset_shard("train")
train.report({"size": len(df)})
I’d like to ask if there is any practical difference between these two approaches and why the approach I find more efficient is not suggested in the docs. What are the use cases when using StreamSplitDataIterator
might be better?
Thanks!