ray.data.Dataset.add_column / Ray 2.7

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

env

  • python version: 3.10.12
  • Ray version: 2.7.0

Hi,

I’ve just updated Ray 2.7 and got some difficulties when running my code on ray cluster/remote mode. Digging the error of my code I found the man page(
ray.data.Dataset.add_column — Ray 2.7.0) code could not be executed well as follows

ds = ray.data.range(100)
ds.schema()

Column  Type
------  ----
id      int64

ds.add_column("new_id", lambda df: df["id"] * 2).schema()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 ds.add_column("new_id", lambda df: df["id"] * 2).schema()

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/dataset.py:2541, in Dataset.schema(self, fetch_if_missing)
   2536     return None
   2538 # Lazily execute only the first block to minimize computation.
   2539 # We achieve this by appending a Limit[1] operation to a copy
   2540 # of this Dataset, which we then execute to get its schema.
-> 2541 base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing)
   2542 if base_schema:
   2543     self._plan.cache_schema(base_schema)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/plan.py:405, in ExecutionPlan.schema(self, fetch_if_missing)
    403             self._stages_after_snapshot.append(a)
    404     else:
--> 405         self.execute()
    406 elif len(self._stages_after_snapshot) == 1 and isinstance(
    407     self._stages_after_snapshot[-1], RandomizeBlocksStage
    408 ):
    409     # If RandomizeBlocksStage is last stage, we execute it (regardless of
    410     # the fetch_if_missing), since RandomizeBlocksStage is just changing
    411     # the order of references (hence super cheap).
    412     self.execute()

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/plan.py:591, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    589 else:
    590     executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 591 blocks = execute_to_legacy_block_list(
    592     executor,
    593     self,
    594     allow_clear_input_blocks=allow_clear_input_blocks,
    595     dataset_uuid=self._dataset_uuid,
    596     preserve_order=preserve_order,
    597 )
    598 # TODO(ekl) we shouldn't need to set this in the future once we move
    599 # to a fully lazy execution model, unless .materialize() is used. Th
    600 # reason we need it right now is since the user may iterate over a
    601 # Dataset multiple times after fully executing it once.
    602 if not self._run_by_consumer:

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py:119, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    112 dag, stats = _get_execution_dag(
    113     executor,
    114     plan,
    115     allow_clear_input_blocks,
    116     preserve_order,
    117 )
    118 bundles = executor.execute(dag, initial_stats=stats)
--> 119 block_list = _bundles_to_block_list(bundles)
    120 # Set the stats UUID after execution finishes.
    121 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py:357, in _bundles_to_block_list(bundles)
    355 blocks, metadata = [], []
    356 owns_blocks = True
--> 357 for ref_bundle in bundles:
    358     if not ref_bundle.owns_blocks:
    359         owns_blocks = False

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py:37, in OutputIterator.__next__(self)
     36 def __next__(self) -> RefBundle:
---> 37     return self.get_next()

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:129, in StreamingExecutor.execute.<locals>.StreamIterator.get_next(self, output_split_idx)
    127         raise StopIteration
    128 elif isinstance(item, Exception):
--> 129     raise item
    130 else:
    131     # Otherwise return a concrete RefBundle.
    132     if self._outer._global_info:

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:187, in StreamingExecutor.run(self)
    181 """Run the control loop in a helper thread.
    182 
    183 Results are returned via the output node's outqueue.
    184 """
    185 try:
    186     # Run scheduling loop until complete.
--> 187     while self._scheduling_loop_step(self._topology) and not self._shutdown:
    188         pass
    189 except Exception as e:
    190     # Propagate it to the result iterator.

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py:256, in StreamingExecutor._scheduling_loop_step(self, topology)
    254 if DEBUG_TRACE_SCHEDULING:
    255     _debug_dump_topology(topology)
--> 256 topology[op].dispatch_next_task()
    257 cur_usage = TopologyResourceUsage.of(topology)
    258 op = select_operator_to_run(
    259     topology,
    260     cur_usage,
   (...)
    264     autoscaling_state=self._autoscaling_state,
    265 )

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py:198, in OpState.dispatch_next_task(self)
    196 for i, inqueue in enumerate(self.inqueues):
    197     if inqueue:
--> 198         self.op.add_input(inqueue.popleft(), input_index=i)
    199         return
    200 assert False, "Nothing to dispatch"

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py:202, in MapOperator.add_input(self, refs, input_index)
    198 if self._block_ref_bundler.has_bundle():
    199     # If the bundler has a full bundle, add it to the operator's task submission
    200     # queue.
    201     bundle = self._block_ref_bundler.get_next_bundle()
--> 202     self._add_bundled_input(bundle)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py:48, in TaskPoolMapOperator._add_bundled_input(self, bundle)
     46 input_blocks = [block for block, _ in bundle.blocks]
     47 ctx = TaskContext(task_idx=self._next_data_task_idx)
---> 48 gen = map_task.options(
     49     **self._get_runtime_ray_remote_args(input_bundle=bundle), name=self.name
     50 ).remote(
     51     self._map_transformer_ref,
     52     DataContext.get_current(),
     53     ctx,
     54     *input_blocks,
     55 )
     56 self._submit_data_task(gen, bundle)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/remote_function.py:241, in RemoteFunction.options.<locals>.FuncWrapper.remote(self, *args, **kwargs)
    240 def remote(self, *args, **kwargs):
--> 241     return func_cls._remote(args=args, kwargs=kwargs, **updated_options)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/tracing/tracing_helper.py:310, in _tracing_task_invocation.<locals>._invocation_remote_span(self, args, kwargs, *_args, **_kwargs)
    308     if kwargs is not None:
    309         assert "_ray_trace_ctx" not in kwargs
--> 310     return method(self, args, kwargs, *_args, **_kwargs)
    312 assert "_ray_trace_ctx" not in kwargs
    313 tracer = _opentelemetry.trace.get_tracer(__name__)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/remote_function.py:263, in RemoteFunction._remote(self, args, kwargs, **task_options)
    261 task_options.pop("max_calls", None)
    262 if client_mode_should_convert():
--> 263     return client_mode_convert_function(self, args, kwargs, **task_options)
    265 worker = ray._private.worker.global_worker
    266 worker.check_connected()

File /opt/my_env/python310/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:164, in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
    162     setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
    163 client_func = ray._get_converted(key)
--> 164 return client_func._remote(in_args, in_kwargs, **kwargs)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/client/common.py:298, in ClientRemoteFunc._remote(self, args, kwargs, **option_args)
    296 if kwargs is None:
    297     kwargs = {}
--> 298 return self.options(**option_args).remote(*args, **kwargs)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/client/common.py:581, in OptionWrapper.remote(self, *args, **kwargs)
    579 def remote(self, *args, **kwargs):
    580     self._remote_stub._signature.bind(*args, **kwargs)
--> 581     return return_refs(ray.call_remote(self, *args, **kwargs))

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/client/api.py:100, in _ClientAPI.call_remote(self, instance, *args, **kwargs)
     86 def call_remote(self, instance: "ClientStub", *args, **kwargs) -> List[Future]:
     87     """call_remote is called by stub objects to execute them remotely.
     88 
     89     This is used by stub objects in situations where they're called
   (...)
     98         kwargs: opaque keyword arguments
     99     """
--> 100     return self.worker.call_remote(instance, *args, **kwargs)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/client/worker.py:562, in Worker.call_remote(self, instance, *args, **kwargs)
    560 if instance._num_returns() == "dynamic":
    561     num_returns = -1
--> 562 return self._call_schedule_for_task(task, num_returns)

File /opt/my_env/python310/lib/python3.10/site-packages/ray/util/client/worker.py:575, in Worker._call_schedule_for_task(self, task, num_returns)
    573 if num_return_refs == -1:
    574     num_return_refs = 1
--> 575 id_futures = [Future() for _ in range(num_return_refs)]
    577 def populate_ids(resp: Union[ray_client_pb2.DataResponse, Exception]) -> None:
    578     if isinstance(resp, Exception):

TypeError: 'str' object cannot be interpreted as an integer

The same code run well on ray local mode.
I think Ray 2.6.3 doesn’t have such problem both on local/remote mode.

Hi @nyamashi , we generally recommend against using Ray Client due to architectural limitations, especially around ML workloads involving data. As an alternative, you can use the Ray Jobs API as described here.

Let me know if this workaround works for your case, or if you have additional followups.

@sjl Thanks for your quick response.

I thinks I understand the situation.
I guess it happened like Dataset.take() and Dataset.show() couldn’t work as before Ray 2.6(?).
I confirmed the following code works well on remote mode with Ray client API instead of calling the line directly.

@ray.remote
def run(ds):
    return ds.add_column("new_id", lambda df: df["id"] * 2).schema()
​
ray.get(run.remote(ds))

Column  Type
------  ----
id      int64
new_id  int64

I’ll try to adjust my ML code with Client API to Ray 2.7. Thanks!