Data Retrieval Best Practices

Hi, we are using ray to build a generic ML training platform, deployed on google Kubernetes engine (GKE).
Our training data is stored in google Bigtable and BigQuery and the data required for training one model can be large. What is the recommended way of doing this? i.e. Do you recommend:

  1. directly connecting to BT/BQ to retrieve the data
    Or 2. first query the required data and store the result on an object store like GCS (google cloud storage), and then let the training job read the data from the files?

I can see the benefit of options 1 that is that the workflow is simpler.
Option 2 gives you the reproducibility of the model training so you know the exact data that is used, while option 1 the data in the database can be changed from the last training.

Performance wise if there are things to consider?
Thanks

Option 2 is what most people do. In that scenario ray.data.* can be used to shard the data for distributed training, and you are not worried about data change in the BT, which defeats the purpose of reproducibility that you alluded.

cc: @amogkam @jjyao Does option 2 makes sense here. And also, do we have plans for ray.data connector for Bigtable or BigQuery?

Thanks @Jules_Damji There is still a little learning curve on ray data. I’m a bit confused by ray object store and ray data, are they related?
Is it required to use ray data to do distributed training in ray (of a single model, not training many models in parallel)?
Also, do you have suggestions for training scikit learn model with large amount of data that can’t fit in memory?

@Y_C

There is still a little learning curve on ray data. I’m a little big confused by ray object store and ray data, are they related?

Ray object store is a distributed store to store data among tasks and actors running on a worker process in a Ray cluster. Ray data, without you having to explicitly worry about it, uses Ray object store to distributed sharded data to your training code running on the worker processes on each Ray cluster node. In short, Ray Datasets are the standard mechanism for loading and exchanging data in Ray AIR libraries.

Is it required to use ray data to do distributed training in ray (of a single model, not training many models in parallel)?

If you want to do distributed training of a single model with a large data set then Ray data will distributed a shard of the data to the copy of model running on each worker process.

Say you have a single model M and a large data D. And you want to distribute training for M. Ray Train, working with Ray Data, will replicate copies of your model M. And will distributed shared data (or slice of the training data d ). Underneath it’ll use Ray object store to store the model and shards of data.

You need not worry about it.

Also, do you have suggestions for training scikit learn model with large amount of data that can’t fit in memory?

Some scikit-learn examples
And you can use the SklearnTrainer

cc: @amogkam @Yard1 Do we have any scikit-learn examples that use Ray Data for distributed training?

@Y_C does this answer your questions?

Thanks! @Jules_Damji sort of. Reading the docs of SklearnTrainer, This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.
I don’t think it can run the distributed training, maybe it’s just the limitation of the framework.

I still have difficulties understanding ray object store. Since every node has its own object store, how does it help reducing the transferring of data? Does it only help with different processes within the same node? I read about serialisation between different node here, it’s still not very clear to me. Maybe you could recommend some study resources?

And about option 2, is there a best format to use when exporting from BigQuery for both ray task and ray train?

As I read here about pre-processing, it suggests to only use ray data for the last mile pre-processing. Right now we have our ETL pipeline set up (basic transform, cleansing, simple calculation) but not the pre-processing or the feature engineering.
Is it recommended to make the pre-processing and training two separate steps in ray? Maybe the preprocessing is its own remote task and training a different one? Since they can require different amount of resources? Is there an end to end example from the (last-mile) preprocessing and training?

Thanks for answering my long questions!

This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.
I don’t think it can run the distributed training, maybe it’s just the limitation of the framework.

You can wrap your training function in make it distributed using the DataParallelTrainer.
Is there a reason why you must use sciki-learn? RayXGBoost allows you do distributed training,. Whyn not use that option.

I still have difficulties understanding ray object store. Since every node has its own object store, how does it help reducing the transferring of data? Does it only help with different processes within the same node? I read about serialisation between different node here, it’s still not very clear to me. Maybe you could recommend some study resources?

Ray can take advantage of data locality. Worker processes running your tasks or training on the same Ray cluster node can access shared memory with zero-copy access. That is, there no transfer of inter-process data. All that is accessed via shared-memory.

If the task or worker on node N needs data that is stored on node NN then data from node N ->NN will be copied. Subsequently, any workers running now on NN can have zero-copy access to it. This eliminates data transfer between inter-processes on the same node at the initial cost of transferring once between node.

That’s how distributed shared-memory works.

And about option 2, is there a best format to use when exporting from BigQuery for both ray task and ray train?

ray.data support many formats. Parquet seems to be most common, due to compression and performance, and support of push-down predicates.

As I read here about pre-processing, it suggests to only use ray data for the last mile pre-processing. Right now we have our ETL pipeline set up (basic transform, cleansing, simple calculation) but not the pre-processing or the feature engineering.

Ray data is not optimal for ETL or building data pipelines. You can do efficient last-mile transformation or common preprocessing using its support for preprocessors.

Is it recommended to make the pre-processing and training two separate steps in ray? Maybe the preprocessing is its own remote task and training a different one? Since they can require different amount of resources? Is there an end to end example from the (last-mile) preprocessing and training?

Here are some examples for ray data preprocessing.

I hope these resources give you ample fodder.

hth
Jules

Hi @Jules_Damji thanks for the reply, really helpful.

The reason I’m using scikit-learn is that we are building a ML platform for our product where external data scientists can train and serve their model and use the models in our product. So we want to support all main stream frameworks.

I will dive into the ray data examples, thanks!