RayTaskError (TypeError)

Hello - I am new to the Ray, I created the train_set = RayDMatrix(ray_dataset, "rz_flag") and passed it

bst = train(
    params= xgboost_params | hyper_params,
    dtrain=train_set,
    evals=[(train_set, "train")],
    evals_result=evals_result,
    ray_params=RayParams(
        cpus_per_actor=15,
        num_actors=10,
    ),
    verbose_eval=False,
    num_boost_round=10,
    # callbacks = [TqdmCallback(10)]
)

training process goes fine but I get the following error in prediction. What is the issue here? Creating a RayDMatrix from ray_dataset? ray_dataset is created using ray.data.read_parquet()

pred_set = RayDMatrix(data = ray_dataset, 
                      label = "rz_flag", 
                    )

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, pred_set, ray_params=RayParams(num_actors=10, cpus_per_actor=15))
---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
File <command-4498259778197045>:9
      4 pred_set = RayDMatrix(data = ray_dataset, 
      5                       label = "rz_flag", 
      6                     )
      8 bst = xgb.Booster(model_file="model.xgb")
----> 9 pred_ray = predict(bst, pred_set, ray_params=RayParams(num_actors=10, cpus_per_actor=15))
     11 print(pred_ray)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/main.py:1850, in predict(model, data, ray_params, _remote, **kwargs)
   1848 while tries <= max_actor_restarts:
   1849     try:
-> 1850         return _predict(model, data, ray_params=ray_params, **kwargs)
   1851     except RayActorError:
   1852         if tries + 1 <= max_actor_restarts:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/main.py:1752, in _predict(model, data, ray_params, **kwargs)
   1749     wait_load.extend(_trigger_data_load(actor, data, []))
   1751 try:
-> 1752     ray.get(wait_load)
   1753 except Exception as exc:
   1754     logger.warning(f"Caught an error during prediction: {str(exc)}")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/ray/_private/auto_init_hook.py:18, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     15 @wraps(fn)
     16 def auto_init_wrapper(*args, **kwargs):
     17     auto_init_ray()
---> 18     return fn(*args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/ray/_private/worker.py:2540, in get(object_refs, timeout)
   2538     worker.core_worker.dump_object_store_memory_usage()
   2539 if isinstance(value, RayTaskError):
-> 2540     raise value.as_instanceof_cause()
   2541 else:
   2542     raise value

RayTaskError(TypeError): ray::_RemoteRayXGBoostActor.load_data() (pid=22141, ip=10.24.105.11, actor_id=5a1eaafcaa0216f3622eb4e302000000, repr=<xgboost_ray.main._RemoteRayXGBoostActor object at 0x7fafe8fe6d30>)
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/main.py", line 639, in load_data
    param = data.get_data(self.rank, self.num_actors)
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/matrix.py", line 928, in get_data
    self.load_data(num_actors=num_actors, rank=rank)
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/matrix.py", line 913, in load_data
    refs, self.n = self.loader.load_data(
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/matrix.py", line 651, in load_data
    local_df = data_source.load_data(
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/data_sources/ray_dataset.py", line 61, in load_data
    data = [data[i] for i in indices]
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-9a4cb759-9e6b-43cf-ab41-7ce3b0f8e0cd/lib/python3.9/site-packages/xgboost_ray/data_sources/ray_dataset.py", line 61, in <listcomp>
    data = [data[i] for i in indices]
TypeError: 'Dataset' object is not subscriptable

I could reproduce this with a simple example. At the first glance it looks like a bug in xgboost-ray’s prediction code. I’ll take a look.

As a workaround and if feasible (if the data is small enough), can you pass distributed=False to your RayDMatrix creation?

pred_set = RayDMatrix(data = ray_dataset, 
                      label = "rz_flag", 
                      distributed=False
                    )

I’ve looked into this and it seems like this was due to a (older) API change on the Ray Dataset side that we haven’t been testing for in XGBoost-Ray.

Going forward, the recommendation for scalable inference with XGBoost-Ray will be to directly use Ray Data’s map_batches functionality, e.g. like this:

    # Distributed prediction
    scored = ray_dataset.drop_columns(["rz_flag"]).map_batches(
        lambda batch: {"pred": bst.predict(DMatrix(batch))}, batch_format="pandas"
    )
    print(scored.to_pandas())

Thanks Kai.

My dataset is relatively big 11 million rows at the moment, but it will get bigger. I know with Spark, to_Pandas() brings everything down to the driver node and can crash if the data is big. I just wanted to avoid this possibility. Is there a different way?

The data size is 7 GB. I set the distributed = False, but it’s expecting the num_actors ray_params=RayParams(num_actors=10)). If it’s not distributed, shall we not create a single actor?

Is map_batches in the documentation? I’d appreciate if you could refer me to the docs.

Unexpected exception formatting exception. Falling back to standard exception
Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-671209778712541>", line 7, in <module>
    print(scored.to_pandas())
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-ce14acb1-7ef3-4d71-8b33-4f71424cc4a7/lib/python3.9/site-packages/ray/data/dataset.py", line 3707, in to_pandas
    raise ValueError(
ValueError: the dataset has more than the given limit of 100000 records: 10951888. If you are sure that a DataFrame with 10951888 rows will fit in local memory, use ds.to_pandas(limit=10951888).

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 1997, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 1112, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 1006, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 859, in structured_traceback
    formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 812, in format_exception_as_a_whole
    frames.append(self.format_record(r))
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 730, in format_record
    result += ''.join(_format_traceback_lines(frame_info.lines, Colors, self.has_colors, lvals))
  File "/databricks/python/lib/python3.9/site-packages/stack_data/utils.py", line 145, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/databricks/python/lib/python3.9/site-packages/stack_data/core.py", line 698, in lines
    pieces = self.included_pieces
  File "/databricks/python/lib/python3.9/site-packages/stack_data/utils.py", line 145, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/databricks/python/lib/python3.9/site-packages/stack_data/core.py", line 649, in included_pieces
    pos = scope_pieces.index(self.executing_piece)
  File "/databricks/python/lib/python3.9/site-packages/stack_data/utils.py", line 145, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/databricks/python/lib/python3.9/site-packages/stack_data/core.py", line 628, in executing_piece
    return only(
  File "/databricks/python/lib/python3.9/site-packages/executing/executing.py", line 164, in only
    raise NotOneValueFound('Expected one value, found 0')
executing.executing.NotOneValueFound: Expected one value, found 0
ValueError: the dataset has more than the given limit of 100000 records: 10951888. If you are sure that a DataFrame with 10951888 rows will fit in local memory, use ds.to_pandas(limit=10951888).

Also, in terms of maturity, what makes sense to use, Ray Train XgboostTrainer or xgboost_ray? I am really not sure why there are two APIs for xgboost?

You probably just don’t want to call to_pandas() then? Just use write_csv or any other downstream API depending on your needs.

The Ray Data docs can be found here: Ray Data: Scalable Datasets for ML — Ray 2.6.0

Map batches: ray.data.Dataset.map_batches — Ray 2.6.0

or here as a user guide: Transforming Data — Ray 2.6.0

and here for saving the resulting data: Saving Data — Ray 2.6.0

XGBoostTrainer uses XGBoost-Ray internally, so functionally they are very similar. They co-exist mostly for historical reasons (XGBoost-Ray is older than Ray Train). Other than that, XGBoost-Ray supports elastic training (Ray Train currently doesn’t), and acts as a drop-in replacement for the regular XGBoost API - i.e. for existing users switching from XGBoost to XGBoost-Ray is a 3 line code change.

In the future we will likely move the implementation over to Ray Train, but this is a low priority issue right now.

Thanks Kai. I’d prefer conversion to spark_df instead of csv, if possible, which we then can write it as a delta. However, to_spark() seems not to work. Any ideas?

With XgboostTrainer, could you show how to generate predictions? Is it same as above? Could you show a small example?

Also, what caught my attention is we use RayDMatrix for training but DMatrix for prediction. Any compelling reason?

Apologies for the delay.

What happens when you try to_spark() - which error comes up?

The example above is used for predictions. We are deprecating the XGBoostPredictor class, and instead refer to the Ray Data API for predictions.

Technically, XGBoost prediction is just “data parallel” - i.e. the data is split up and fed through the model. This is what map_batches does for us. Since we’re interacting directly with the native XGBoost library, we use the DMatrix.

map_batches(..., batch_format="pandas") will automatically convert batches into pandas DFs which can be read by DMatrix directly.

Here is the log of the error where scored is a raydataset type created at inferencing How can we know what’s deprecating versus not? Is there an up to date API doc?

ValueError                                Traceback (most recent call last)
File <command-326563723496342>:1
----> 1 scored.to_spark(spark)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/ray/data/dataset.py:3673, in Dataset.to_spark(self, spark)
   3671 if isinstance(schema, Schema):
   3672     schema = schema.base_schema  # Backwards compat with non strict mode.
-> 3673 return raydp.spark.ray_dataset_to_spark_dataframe(
   3674     spark, schema, self.get_internal_block_refs()
   3675 )

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/raydp/spark/dataset.py:277, in ray_dataset_to_spark_dataframe(spark, arrow_schema, blocks, locations)
    275     return _convert_by_rdd(spark, blocks, locations, schema)
    276 elif isinstance(sample, pa.Table):
--> 277     return _convert_by_udf(spark, blocks, locations, schema)
    278 else:
    279     raise RuntimeError("ray.to_spark only supports arrow type blocks")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/raydp/spark/dataset.py:212, in _convert_by_udf(spark, blocks, locations, schema)
    207 def _convert_by_udf(spark: sql.SparkSession,
    208                     blocks: List[ObjectRef],
    209                     locations: List[bytes],
    210                     schema: StructType) -> DataFrame:
    211     holder_name  = spark.sparkContext.appName + RAYDP_SPARK_MASTER_SUFFIX
--> 212     holder = ray.get_actor(holder_name)
    213     df_id = uuid.uuid4()
    214     ray.get(holder.add_objects.remote(df_id, blocks))

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    102         return getattr(ray, func.__name__)(*args, **kwargs)
--> 103 return func(*args, **kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-bf79a415-8a74-4f29-84be-b85ada0e5d70/lib/python3.9/site-packages/ray/_private/worker.py:2770, in get_actor(name, namespace)
   2768 worker = global_worker
   2769 worker.check_connected()
-> 2770 return worker.core_worker.get_named_actor_handle(name, namespace or "")

File python/ray/_raylet.pyx:3514, in ray._raylet.CoreWorker.get_named_actor_handle()

File python/ray/_raylet.pyx:404, in ray._raylet.check_status()

ValueError: Failed to look up actor with name 'Databricks Shell_SPARK_MASTER'. This could because 1. You are trying to look up a named actor you didn't create. 2. The named actor died. 3. You did not use a namespace matching the namespace of the actor.

Hi,

RayDP is “spark-on-ray” which is incompatible with Databricks runtime, please use Ray-on-Spark: https://docs.databricks.com/en/machine-learning/ray-integration.html