Understanding distributed data loading and training xgboost ray

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m using xgboost_ray package to train an xgboost model.
The data is loaded from ray.Dataset to RayDMatrix. I’m giving 2 actors and 1 cpu per actor as RayParams.

def load_data(path: str) -> RayDMatrix:
    ds = ray.data.read_parquet(path)
    label_col = ds.schema().names[-1]   # User needs to ensure that the last column is the label column
    return RayDMatrix(ds, label=label_col)

The path is a directory in google cloud storage. The training process:

@remote(num_cpu=1, max_task_retries=3, max_restarts=3)
def MyTrainer:
    def training_pipeline():
        train_dtrain = load_data("gs://my_bucket/my_dir_with_parquet_files")
        xgb_reg_model = xgboost_ray.train(param, train_dtrain, RayParams(num_actors=2, cpus_per_actor=1), early_stopping_rounds=int(param['early_stopping_rounds']), num_boost_round=int(param['n_estimators']), evals=[(train_dtrain, 'train')],
                                           evals_result=result, verbose_eval=True, xgb_model=model)
my_trainer_instance = MyTrainer()
ray_ref = my_training_instance.training_pipeline.remote()
ray.get(ray_ref)

Got the error:

INFO:__main__:ray job finished with state: FAILED
ERROR:__main__:
 error trace back: Job failed due to an application error, last available logs (truncated to 20,000 chars):
  File "/home/ray/anaconda3/lib/python3.10/site-packages/xgboost_ray/main.py", line 1598, in train
    bst, train_evals_result, train_additional_results = _train(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/xgboost_ray/main.py", line 1134, in _train
    dtrain.assert_enough_shards_for_actors(num_actors=ray_params.num_actors)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/xgboost_ray/matrix.py", line 884, in assert_enough_shards_for_actors
    self.loader.assert_enough_shards_for_actors(num_actors=num_actors)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/xgboost_ray/matrix.py", line 569, in assert_enough_shards_for_actors
    raise RuntimeError(
RuntimeError: Trying to shard data for 2 actors, but the maximum number of shards is 1. If you want to shard the dataset by rows, conside
r centralized loading by passing `distributed=False` to the `RayDMatrix`. Otherwise consider using fewer actors or re-partitioning your d
ata.

Questions:

  1. I understood my error since there is only one parquet file in my directory. : https://github.com/ray-project/xgboost_ray:
    Centralized loading is used when you pass centralized in-memory dataframes, such as Pandas dataframes ... such as a single CSV or Parquet file.
    What if I first read the file into a ray Dataset and then load it into RayDMatrix? Does it still use centralised loading?

  2. It seems that it’s required to give a list of file names to enable distributed loading, even if a directory contains multiple parquet files is given, it’s still using centralised data loading.

  3. Should I directly read parquet files to RayDMatrix instead of using RayDataset as intermediate? However since I don’t know the label_col before hand, I used ray dataset to get the column names.

  4. Does centralized loading mean that it will require more memory or is it slower? And if using centralized loading, how many shards will the head node create, is it the number of actors given in the train func?

  5. What is the relationship between the cpu_per_actor and the resource requested by the ray actor (MyTrainer) which contains the training logic, where I have the @remote(num_cpu=1)?
    If I use RayParams(num_actors=2, cpus_per_actor=1) in the train function and @remote(num_cpu=1) on the MyTrainer, it seems that in total 3 actors are created. What would be the logical resource required to schedule this training job? 3 cpus?

  1. Another thing I noticed was that scaling up was triggered at almost the end of the training job, it scaled up to 5 worker nodes from 1 worker node, although there is enough cpus on my worker node. And all the new workers got killed very quickly since the training job is done. It seemed that it was retrying something? Any suggestions to avoid the unneeded scale up?

103e[2me[36m(_RemoteRayXGBoostActor pid=1050, ip=10.52.5.2)e[0m 2023-07-07 08:31:31,139 INFO main.py:1254 -- Training in progress (31 seconds since last restart).
And I noticed there is one task that failed in each of the training actors.

Hi @Y_C, re error of

RuntimeError: Trying to shard data for 2 actors, but the maximum number of shards is 1. If you want to shard the dataset by rows, conside
r centralized loading by passing `distributed=False` to the `RayDMatrix`. Otherwise consider using fewer actors or re-partitioning your d
ata.

As you said, yes, this is because you only has 1 Parquet file in directory. For this example that you have 2 actors / training workers, you can call Dataset.repartition(2) before creating the RayDMatrix, so the Ray Dataset you created would have two blocks / shards, and each actor can train on.

def load_data(path: str) -> RayDMatrix:
    ds = ray.data.read_parquet(path)
    ds = ds.repartition(2)
    label_col = ds.schema().names[-1]   # User needs to ensure that the last column is the label column
    return RayDMatrix(ds, label=label_col)

Note: you don’t need to do the Dataset.repartition() if you have multiple Parquet files in your directory.

Yes it would be 3 actors and each one requires 1 CPU logical resource. One actor for the MyTrainer class, and two actors for training workers, each one of them is _RemoteRayXGBoostActor. Is there a reason why you need to create the MyTrainer class? It seems to me you can call xgboost_ray.train() directly, without wrapping it inside MyTrainer.training_pipeline().

Note the _StatsActor, _QueueActor and _EventActor are internal actors created during Ray execution, not related to your application code.

Thanks so much for replying @chengsu . The reason is that we are creating a training platform that can work with different framework (sklearn, xgboost etc) and needs the user to write the training script in a certain structure. We might look into removing the unneeded extra actor. Should I just remove the @remote(num_cpu=1) on the MyTrainer actor? If I understand correctly this is only for the auto scaler to determine whether the ray cluster needs to add more worker right?

Can you please also have a look at my question 6? I suspect that is related to the retried tasks, but I’m not sure how to verify that, do you have suggestions? It seems the failed tasks don’t really matter, it’s ray termination? The job still succeeded.

Yes, just remove the @remote(num_cpu=1) on the MyTrainer actor should be good.

I feel this is coming from task retry. Here is our doc to understand dashboard - Ray Dashboard — Ray 2.8.0 - “Task and Actor errors”.

For the original issue, this was a bug in XGBoost-Ray - with a Ray dataset, we can repartition a dataset automatically if needed. This was fixed in xgboost_ray==0.17.0 and the issue shouldn’t come up anymore.

Thanks for the reply. Do you mean that now even if reading from a single file it would be loaded in a distributed way? Or only if I first load it into a ray dataset?
Can you also please elaborate the difference between centralised loading and distributed loading? It seems that if the centralized loader is chosen, it would still require the machine to be able to hold the entire dataset in memory, cause underneath it loads the data into pandas dataframe. Is it correct?

Yes that’s right. Centralized loading is useful if the data is small enough and you have it available on one machine. It will then distribute the data from a central location. The machine needs to be able to hold the full dataset in memory.

In most cases distributed data loading should be preferred if possible.

Regarding your last question, yes, the fix affects Ray Datasets, so if you use them to load the data, it will be distributed automatically. Note that if it’s only one parquet file this will be similar to the centralized data loading in XGBoost-Ray.

@kai thanks! So the fix is for the case where a directory containing more than one files right? So the partition depends on if the data is already split into multiple files. Can you please share a link to the github issue of this fix? Thanks!

Sure, the pull request is here: Repartition Ray dataset if number of shards is too small by krfricke · Pull Request #283 · ray-project/xgboost_ray · GitHub

Yes, if you have more than one file this will be the number of partitions in XGBoost-Ray or the number of blocks in a Ray Dataset.

1 Like