Ray tune metrics not consistent with offline evaluation

I use ray tune for lightgbm model training. The val and test result I got from the ResultGrid is different from offline run with the same model iteration and dataset. I wonder how ray tune does the evaluation. Does it use the full val set or a shard? Thanks!

can you paste your script? what do you mean by offline evaluation in this context?

What I meant by offline evaluation is loading the model and apply it to val and test set.

For example:
ckpt = LightGBMCheckpoint.from_uri(ckpt_dir)
batch_predictor = BatchPredictor.from_checkpoint(
ckpt, LightGBMPredictor
)
prob = batch_predictor.predict(dataset, feature_columns=feat_cols, keep_columns=[‘event_label’], num_iteration=501)
avg_prec = average_precision(y_true, prob)

This is different from what I got from the progress.csv in the same ckpt directory:
progress = pd.read_csv(f"{trial_dir}/progress.csv")
and check the corresponding performance at the same num_iteration 501.

The progress performance is aligned with the result grid from the tuner.

how is evaluation done in the training loop? How is evaluation metrics reported in the training loop?

I am using

from ray.train.lightgbm import LightGBMTrainer

trainer = LightGBMTrainer(
datasets={“train”: train_dataset, “val”: val_dataset, “test”: test_dataset},
label_column=cfg.data.label_col,
params={},
dmatrix_params={
‘train’: {‘weight’: cfg.data.weight_col},
‘val’: {‘weight’: cfg.data.weight_col},
‘test’: {‘weight’: cfg.data.weight_col}
},
eval_metric=eval_func,
scaling_config=ScalingConfig(
num_workers=cfg.task.num_workers,
resources_per_worker={“CPU”: cfg.task.num_cpus_per_workers},
trainer_resources={“CPU”: 0},
use_gpu=False,
placement_strategy=“SPREAD”,
_max_cpu_fraction_per_node=0.8,
),
)

The training loop is defined in
***/lib/python3.7/site-packages/ray/train/gbdt_trainer.py

def training_loop(self) → None:
config = self.train_kwargs.copy()
dmatrices = self._get_dmatrices(
dmatrix_params=self.dmatrix_params,
)
train_dmatrix = dmatrices[TRAIN_DATASET_KEY]
evals_result = {}

    init_model = None
    if self.resume_from_checkpoint:
        init_model, _ = self._load_checkpoint(self.resume_from_checkpoint)

    config.setdefault("verbose_eval", False)
    config.setdefault("callbacks", [])

    if not any(
        isinstance(
            cb, (self._tune_callback_report_cls, self._tune_callback_checkpoint_cls)
        )
        for cb in config["callbacks"]
    ):
        # Only add our own callback if it hasn't been added before
        checkpoint_frequency = (
            self.run_config.checkpoint_config.checkpoint_frequency
        )
        if checkpoint_frequency > 0:
            callback = self._tune_callback_checkpoint_cls(
                filename=MODEL_KEY, frequency=checkpoint_frequency
            )
        else:
            callback = self._tune_callback_report_cls()

        config["callbacks"] += [callback]

    config[self._init_model_arg_name] = init_model

    model = self._train(
        params=self.params,
        dtrain=train_dmatrix,
        evals_result=evals_result,
        evals=[(dmatrix, k) for k, dmatrix in dmatrices.items()],
        ray_params=self._ray_params,
        **config,
    )

    checkpoint_at_end = self.run_config.checkpoint_config.checkpoint_at_end
    if checkpoint_at_end is None:
        checkpoint_at_end = True

    if checkpoint_at_end:
        self._checkpoint_at_end(model, evals_result)

I used

trainer = LightGBMTrainer(
        datasets={"train": train_dataset, "val": val_dataset, "test": test_dataset},
        label_column=cfg.data.label_col,
        params={},
        dmatrix_params={
                'train': {'weight': cfg.data.weight_col},
                'val': {'weight': cfg.data.weight_col},
                'test': {'weight': cfg.data.weight_col}
                },
        eval_metric=eval_func,
        scaling_config=ScalingConfig(
            num_workers=cfg.task.num_workers,
            resources_per_worker={"CPU": cfg.task.num_cpus_per_workers},
            trainer_resources={"CPU": 0},
            use_gpu=False,
            placement_strategy="SPREAD",
            _max_cpu_fraction_per_node=0.8,
        ),
    )

The training loop is in the /lib/python3.7/site-packages/ray/train/gbdt_trainer.py

def training_loop(self) -> None:
        config = self.train_kwargs.copy()

        dmatrices = self._get_dmatrices(
            dmatrix_params=self.dmatrix_params,
        )
        train_dmatrix = dmatrices[TRAIN_DATASET_KEY]
        evals_result = {}

        init_model = None
        if self.resume_from_checkpoint:
            init_model, _ = self._load_checkpoint(self.resume_from_checkpoint)

        config.setdefault("verbose_eval", False)
        config.setdefault("callbacks", [])

        if not any(
            isinstance(
                cb, (self._tune_callback_report_cls, self._tune_callback_checkpoint_cls)
            )
            for cb in config["callbacks"]
        ):
            # Only add our own callback if it hasn't been added before
            checkpoint_frequency = (
                self.run_config.checkpoint_config.checkpoint_frequency
            )
            if checkpoint_frequency > 0:
                callback = self._tune_callback_checkpoint_cls(
                    filename=MODEL_KEY, frequency=checkpoint_frequency
                )
            else:
                callback = self._tune_callback_report_cls()

            config["callbacks"] += [callback]

        config[self._init_model_arg_name] = init_model

        model = self._train(
            params=self.params,
            dtrain=train_dmatrix,
            evals_result=evals_result,
            evals=[(dmatrix, k) for k, dmatrix in dmatrices.items()],
            ray_params=self._ray_params,
            **config,
        )

        checkpoint_at_end = self.run_config.checkpoint_config.checkpoint_at_end
        if checkpoint_at_end is None:
            checkpoint_at_end = True

        if checkpoint_at_end:
            self._checkpoint_at_end(model, evals_result)

Thanks for the context.
lightgbm-ray uses ray actors for distributed training. Each ray actor is running a lightgbm model.fit(). For this part, you can refer to lightgbm documentation. But roughly speaking, each actor is working on its own shard of evaluation dataset (local evaluation). So every actor will have different evaluation results. When combining the evaluation results, the logic treats all workers’ evaluation result the same instead of doing some mathematical average. I think this is why the result is not the same as what you would get by running .predict() using a checkpoint from a given booster round.

cc @Yard1 to confirm my understanding.

Thanks much!!
Is each actor’s evaluation result different? If they are different, which one will be used? Is there a way I can evaluate the model on the whole validation set in Ray tune (not a shard of it)?

We only report the results from rank 0 worker, but each actor should have the same evaluation set - we should be only sharding the training data, not evaluation data. If we are not doing that, this is a bug - I will need to double check the logic in LightGBMTrainer.

I am not sure whether the evaluation is done before or after the gradients are synced. If it is the latte and the data is the same on all workers, then all workers should return the same metrics (I think that is the case).