[Expectation]
I would like to generate multiple trained machine learnning models on each AWS EC2 among the Ray cluster. More precisely, I want each Worker node to read the data input and train the machine learning model.
[Current bahavior]
Only Head node is reading the data and training the machine learning model though there are 6 Woker nodes other than Head node.
How can I train the machine learning model on each of Worker nodes?
import ray
import lightgbm as lgb
from lightgbm import LGBMClassifier
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
if ray.is_initialized:
ray.shutdown()
ray.init(address='auto', _redis_password='5241590000000000', dashboard_host='0.0.0.0', dashboard_port=8265, namespace="job1")
else:
ray.init(address='auto', _redis_password='5241590000000000', dashboard_host='0.0.0.0', dashboard_port=8265, namespace="job1")
params = {
'task': 'train',
'boosting_type': 'gbdt',
'objective': 'binary',
'metric': 'auc',
'num_leaves': 64,
'min_data_in_leaf': 20,
'max_depth': 7,
'verbose': 0,
}
@ray.remote
def f():
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=0.25, random_state=42)
lgb_train = lgb.Dataset(X_train, y_train)
lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
return model_lgb
models=[]
for i in range(1000):
obj_id = f.options(placement_group=pg).remote()
tmp = ray.get(obj_id)
models.append(tmp)
ray.shutdown()
Hey @AtsushiSugai, any time you call ray.get it blocks until it receives the result from a remote function. By structuring your code like this (calling ray.get in a loop), you are running everything sequentially and not paralllelizing anything. The solution would be to create a list of remote references and then call ray.get on that list. You can read more here - Antipattern: Calling ray.get in a loop — Ray v1.9.2
Hi, @Yard1 , thank you very much for the answer. I changed my code as below and ray.get() is now outside the loop.
But, as long as I compared the wall time on Ray cluster and one on single machine, single machine is faster.
I’m wondering if I’m leveraging Ray cluster on this code. It seems the training of machine learning model is processed on a EC2 in Ray cluster since a PID (same PID) got always printed on the log.
@ray.remote
def f():
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=0.25, random_state=42)
lgb_train = lgb.Dataset(X_train, y_train)
lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
return model_lgb
models=[]
obj_ids=[]
for i in range(1000):
tmp = f.options(placement_group=pg).remote()
obj_ids.append(tmp)
models.append(ray.get(obj_ids))
ray.shutdown()
I see that you are using a placement group - could you show the code you are using to create it? It’s possible it’s configured in a way that will limit parallelism. I’d run this code:
ray.init()
print(ray.cluster_resources()) # see if the printed resources match what's expected
NUM_CPUS_PER_JOB = 1
@ray.remote(num_cpus=NUM_CPUS_PER_JOB)
def f():
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=0.25, random_state=42)
lgb_train = lgb.Dataset(X_train, y_train)
lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
params["num_threads"] = NUM_CPUS_PER_JOB # Ray doesn't enforce any resource limits on the tasks, you have to ensure they match yourself
model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
return model_lgb
models=[]
obj_ids=[]
# Check performance without placement group
# f_pg = f.options(placement_group=pg) # create once and reuse
for i in range(1000):
tmp = f_pg.remote()
obj_ids.append(tmp)
models = ray.get(obj_ids)
ray.shutdown()