Only Head node is processing Task

[Expectation]
I would like to generate multiple trained machine learnning models on each AWS EC2 among the Ray cluster. More precisely, I want each Worker node to read the data input and train the machine learning model.

[Current bahavior]
Only Head node is reading the data and training the machine learning model though there are 6 Woker nodes other than Head node.

How can I train the machine learning model on each of Worker nodes?


import ray
import lightgbm as lgb
from lightgbm import LGBMClassifier
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

if ray.is_initialized:
    ray.shutdown()
    ray.init(address='auto', _redis_password='5241590000000000', dashboard_host='0.0.0.0', dashboard_port=8265, namespace="job1")
else:
    ray.init(address='auto', _redis_password='5241590000000000', dashboard_host='0.0.0.0', dashboard_port=8265, namespace="job1")


params = {
    'task': 'train',
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 64,
    'min_data_in_leaf': 20,
    'max_depth': 7,
    'verbose': 0,
}

@ray.remote
def f():
    X, y = load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, train_size=0.25, random_state=42)
    lgb_train = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
    model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
    return model_lgb

models=[]
for i in range(1000):
    obj_id = f.options(placement_group=pg).remote()
    tmp = ray.get(obj_id)
    models.append(tmp)

ray.shutdown()

Hey @AtsushiSugai, any time you call ray.get it blocks until it receives the result from a remote function. By structuring your code like this (calling ray.get in a loop), you are running everything sequentially and not paralllelizing anything. The solution would be to create a list of remote references and then call ray.get on that list. You can read more here - Antipattern: Calling ray.get in a loop — Ray v1.9.2

Hi, @Yard1 , thank you very much for the answer. I changed my code as below and ray.get() is now outside the loop.

But, as long as I compared the wall time on Ray cluster and one on single machine, single machine is faster.

I’m wondering if I’m leveraging Ray cluster on this code. It seems the training of machine learning model is processed on a EC2 in Ray cluster since a PID (same PID) got always printed on the log.

@ray.remote
def f():
    X, y = load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, train_size=0.25, random_state=42)
    lgb_train = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
    model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
    return model_lgb

models=[]
obj_ids=[]
for i in range(1000):
    tmp = f.options(placement_group=pg).remote()
    obj_ids.append(tmp)
   
models.append(ray.get(obj_ids))

ray.shutdown()

Hey @AtsushiSugai, sorry to keep you waiting.

I see that you are using a placement group - could you show the code you are using to create it? It’s possible it’s configured in a way that will limit parallelism. I’d run this code:

ray.init()
print(ray.cluster_resources())  # see if the printed resources match what's expected

NUM_CPUS_PER_JOB = 1
@ray.remote(num_cpus=NUM_CPUS_PER_JOB)
def f():
    X, y = load_breast_cancer(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, train_size=0.25, random_state=42)
    lgb_train = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
    params["num_threads"] = NUM_CPUS_PER_JOB  # Ray doesn't enforce any resource limits on the tasks, you have to ensure they match yourself
    model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)
    return model_lgb

models=[]
obj_ids=[]
# Check performance without placement group
# f_pg = f.options(placement_group=pg)  # create once and reuse
for i in range(1000):
    tmp = f_pg.remote()
    obj_ids.append(tmp)
   
models = ray.get(obj_ids)

ray.shutdown()

Thank you very much for your reply, and sorry for my response late.

I finally could process the code on the machines in parallel though I’m no figuring out the problem which limited the parallelism in previous code.

bundle1 = {"CPU":2}
bundle2 = {"CPU":2}
bundle3 = {"CPU":2}
bundle4 = {"CPU":2}
bundle5 = {"CPU":2}
bundle6 = {"CPU":2}
pg_test = placement_group([bundle1, bundle2, bundle3, bundle4, bundle5, bundle6], strategy="STRICT_SPREAD", name="pg_test")

@ray.remote
def f(volume):
    X, y = load_breast_cancer(return_X_y=True)
    X2 = np.concatenate([X for _ in range(volume)])
    y2 = np.concatenate([y for _ in range(volume)])
    X_train, X_test, y_train, y_test = train_test_split(
        X2, y2, train_size=0.25, random_state=42)
    lgb_train = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)
    model_lgb = lgb.train(params=params, train_set=lgb_train, valid_sets=lgb_eval)

    return socket.gethostbyname(socket.gethostname())

VOLUMES= [2, 10, 100, 500]
NUM_OF_ACTORS=10

execution_time = ["Task&PG"]

for volume in VOLUMES:
    print("*** Start running *** \n Data volume: ", volume)
    start_time = time.time()
    obj_ids=[]
    ip_addresses=[]
    for _ in range(10):
        tmp = f.options(placement_group=pg_test).remote(volume)
        obj_ids.append(tmp)
    ip_addresses = ray.get(obj_ids)
    print('Tasks executed')

    for ip_address, num_tasks in Counter(ip_addresses).items():
        print('    {} tasks on {}'.format(num_tasks, ip_address))

    end_time = time.time()
    print('Executed Time: {:.2f} \n'.format(end_time - start_time) )
    execution_time.append(end_time - start_time)