Pushed Error: At least one of the input arguments for this task could not be computed

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hello, I am new to Ray, I have been working in using Raydp. But my code raise a Pushed Error and a RaySystemError.How can I solve this problem.
Thanks for his answers.

  • My env

ubuntu 20 x64
python==3.9
raydp==1.6
java==8
pyspark==3.4.0
ray==2.1.0
torch==2.1.1+cu118
torchmetrics
  • My Code:

import ray
ray.init(local_mode=True)

import raydp

spark = raydp.init_spark(app_name='RayDP Example 2',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='2GB',)

from pyspark.sql.functions import col
df = spark.range(1, 10,1)
# df.show()
# calculate z = x + 2y + 1000
df = df.withColumn("x", col("id")*2)\
  .withColumn("y", col("id") + 200)\
  .withColumn("z", col("x") + 2*col("y") + 1000)
df.show()
from raydp.utils import random_split
train_df, test_df = random_split(df, [0.7, 0.3])

# PyTorch Code
import torch
class LinearModel(torch.nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = torch.nn.Linear(2, 1)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        return self.linear(x)

model = LinearModel()
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.MSELoss()

def lr_scheduler_creator(optimizer, config):
    return torch.optim.lr_scheduler.MultiStepLR(
      optimizer, milestones=[150, 250, 350], gamma=0.1)

# You can use the RayDP Estimator API or libraries like Ray Train for distributed training.
# ray.timeline(trace_only=True)

from raydp.torch import TorchEstimator
estimator = TorchEstimator(
  num_workers = 2,
  model = model,
  optimizer = optimizer,
  loss = loss_fn,
  lr_scheduler_creator=lr_scheduler_creator,
  feature_columns = ["x","y"],
  label_column = "z",
  batch_size = 2,
  num_epochs = 400,
)
# raydp.torch.TorchEstimator
estimator.fit_on_spark(train_df, test_df)
from raydp.tf import TFEstimator
pytorch_model = estimator.get_model()

estimator.shutdown()
  • My Error:

:job_id:01000000
:actor_name:RayDPSparkMaster
:job_id:01000000
2024-05-28 16:56:27,544	WARNING worker.py:938 -- `ray.get_gpu_ids()` will always return the empty list when called from the driver. This is because Ray does not manage GPU allocations to the driver process.
:actor_name:RayDPSparkMaster
24/05/28 16:56:36 WARN Utils: Your hostname, whu-All-Series resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface eno1)
24/05/28 16:56:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/28 16:56:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/28 16:56:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                +---+---+---+----+
| id|  x|  y|   z|
+---+---+---+----+
|  1|  2|201|1404|
|  2|  4|202|1408|
|  3|  6|203|1412|
|  4|  8|204|1416|
|  5| 10|205|1420|
|  6| 12|206|1424|
|  7| 14|207|1428|
|  8| 16|208|1432|
|  9| 18|209|1436|
+---+---+---+----+
/home/inf/Anaconda3/envs/try/lib/python3.9/site-packages/pyspark/sql/dataframe.py:169: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead.
  warnings.warn(
2024-05-28 16:56:51,448	WARNING __init__.py:181 -- DeprecationWarning: `ray.worker.global_worker` is a private attribute and access will be removed in a future Ray version.
:task_name:get_table_block_metadata
[2024-05-28 16:56:51,455 E 1132796 1132796] core_worker.cc:1690: Pushed Error with JobID: 01000000 of type: task with message: ray::get_table_block_metadata() (pid=1132796, ip=192.168.0.103)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: Unrecognized error type 4 at time: 1.71689e+09
:task_name:get_table_block_metadata
[2024-05-28 16:56:51,456 E 1132796 1132796] core_worker.cc:1690: Pushed Error with JobID: 01000000 of type: task with message: ray::get_table_block_metadata() (pid=1132796, ip=192.168.0.103)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: Unrecognized error type 4 at time: 1.71689e+09
Traceback (most recent call last):
  File "/home/inf/Anaconda3/envs/try/lib/python3.9/site-packages/ray/_private/worker.py", line 2540, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError: ray::get_table_block_metadata() (pid=1132796, ip=192.168.0.103)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: Unrecognized error type 4