I have a section of code , which my peers think it doesn’t execute parallelly across multiple ray nodes. Below is the pasted code, which fetches data from a database, data is of type MLDataset. MLDataset is a class from ray.util.data.Dataset.py . After fetching it I am iterating using async method from ray.util.iter , then splitting it into training and testing data. Converting into tensor slices using TensorFlow API and then feeding it into Ray’s TFTrainer class.Tf Trainer class accepts tensor dataset only. So the requirement here is to improve the code after the first line and verify parallelism across multiple nodes. I can share the whole code , any help is appreciated.
import tensorflow as tf
import ray
from ray.util.sgd.tf.tf_trainer import TFTrainer, TFTrainable
from sklearn.model_selection import train_test_split
def fetch_values_from_database():
custom_dataset = <Custom Method of return type MLDataset.from_parallel_it >
resultList = []
for df in custom_dataset .gather_async():
for value in df.values:
resultList += [[va for va in value]]
resultColumn = [value[-1] for value in resultList]
trainColumns = [value[3:-1] for value in resultList]
X_train, X_test, y_train, y_test = train_test_split(trainColumns, resultColumn,
test_size=0.20, shuffle=True)
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)
return train_dataset, test_dataset
trainer = TFTrainer(
model_creator=<invoke a method which returns TF model>,
data_creator=fetch_values_from_database,
verbose=True
)
I developed a full working example based on this link Distributed TensorFlow — Ray v2.0.0.dev0