Ray Train task stuck at .fit() with node's output in "PENDING" status when using a remote Kubernetes cluster

I encountered an issue while using Ray Train. When initializing Ray with a remote Kubernetes cluster using ray.init() and the cluster address, the training task gets stuck at .fit(). The node’s output remains in a “PENDING” status, which loops indefinitely. I have a screenshot of the pending status to provide for reference.

In my Kubernetes Ray cluster setup, I assigned 1 CPU to the head node and 1 CPU to each of the 4 worker nodes. When running the trainer, I requested num_workers=3. I am able to utilize the Kubernetes cluster fine with regular tasks that are decorated with @ray.remote, so it seems to be a Ray Train specific issue.

I have tried to resolve the issue by ensuring the number of workers specified in ScalingConfig() is smaller than the number of worker pods in the Kubernetes cluster, checking worker nodes’ logs, and examining the head node’s logs, but the problem persists.

Has anyone encountered a similar issue with Ray Train? If so, how did you resolve it? Are there any known issues with Ray Train or its dependencies that could be causing this behavior? Any help or suggestions would be greatly appreciated.

can you post your ScalingConfig() or the entire Trainer setup here?
also, in the pending state, if you wait a minute, Ray will print a message saying “you are asking xyz resources, and there are only uvw resources in the cluster. ignore if there;s auto-scaling” etc etc.
do you see this meesage? If so, that will help us understand how much resources you are actually asking from Ray Train’s perspective.

1 Like
def train_epoch(dataloader, model, loss_fn, optimizer, device):
    model.train()
    epoch_loss = 0
    for batch, (X_batch, y_batch) in enumerate(dataloader):
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        epoch_loss += loss.item()
        model.zero_grad()
        loss.backward()
        optimizer.step()
    return epoch_loss

def train_loop_per_worker(config):
    worker_batch_size = config["batch_size"] // session.get_world_size()
    train_loader = DataLoader(
        train_dataset, batch_size=worker_batch_size, shuffle=True
    )
    train_loader = train.torch.prepare_data_loader(train_loader)

    input_size = X_train.shape[1]
    regressor = NNRegressor(
        input_size=input_size, hidden_layers=5
    )
    regressor = regressor
    regressor = train.torch.prepare_model(regressor)

    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(
        regressor.parameters(), lr=0.001
    )

    for epoch in range(config["num_epochs"]):
        epoch_loss = train_epoch(
            train_loader, regressor, loss_fn, optimizer, device
        )
        print(f"epoch: {epoch}, loss: {epoch_loss}")

        session.report(
            {"loss": epoch_loss},
            checkpoint=Checkpoint.from_dict(dict(epoch=epoch, model=regressor)),
        )


device = "cpu"
runtime_env = {"working_dir": "./", "pip": "./ray_requirements.txt"}

ray.init(address="ray://localhost:10001", runtime_env=runtime_env)

scaling_config = ScalingConfig(
    num_workers=3,
    use_gpu=False,
    _max_cpu_fraction_per_node=0.8,
)

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=dict(batch_size=100000, num_epochs=2),
    scaling_config=scaling_config,
)
result = trainer.fit()
regressor = result.checkpoint.to_dict()["model"]
losses = [checkpoint[1]["loss"] for checkpoint in result.best_checkpoints]

This is my entire trainer setup. But I don’t see the message that you mentioned.

how’s your train_dataset defined?

any chance you can reproduce the problem with a fake range dataset like this?
https://docs.ray.io/en/latest/data/api/doc/ray.data.range_table.html#ray.data.range_table

it looks like this thing should run, since you have 4 cpus available in the cluster, and you are only asking for 3.

1 Like

I’m not too sure how to use this range dataset yet. But I simplified my training data with a dummy dataset and the issue still persists.

    n_rows = 1000
    X1 = np.random.rand(n_rows)
    X2 = np.random.rand(n_rows)
    y = 3 * X1 + 2 * X2 + np.random.normal(0, 0.1, n_rows)
    dummy_data = pd.DataFrame({"Feature1": X1, "Feature2": X2, "Target": y})
    X_train = dummy_data[["Feature1", "Feature2"]]
    y_train = dummy_data["Target"]
    train_dataset = Data(X_train, y_train)

Data is a pytorch Dataset

class Data(Dataset):
    def __init__(self, X_input, y_input):
        self.X = X_input.astype("float32").to_numpy()
        self.y = (
            y_input.astype("float32").to_numpy().reshape(-1, 1)
            if y_input is not None
            else None
        )

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, index):
        features = self.X[index]
        target = self.y[index] if self.y is not None else float("nan")
        return features, target

I have changed the model into XGBoost and still see the same issue. So it probably isn’t specific to TorchTrainer.

    train_dataset = ray.data.from_pandas(dummy_data)
    param = {
        "max_depth": 3,
        "eta": 0.01,
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "base_score": 0.5,
        "n_estimators": 10,
        "early_stopping_rounds": 50,
        "learning_rate": 0.01,
    }
    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(
            num_workers=3,
            use_gpu=False,
            _max_cpu_fraction_per_node=0.8,
        ),
        label_column="Target",
        num_boost_round=20,
        params=param,
        datasets={"train": train_dataset},
    )
    result = trainer.fit()