Unable to add column to ray dataset read via parquet

Hello,

I’m seeing an error while trying to add a column to ray dataset created by reading parquet files on gcs

Env:
Ray: 2.7.1 running on GKE

@ray.remote
def add_dim_event_ts_12hr(ds):
    return ds.add_column("dim_event_ts_12hr", compute_dim_event_ts_12hr)

ds2 = ds.repartition(10)

ds3 = ray.get(add_dim_event_ts_12hr.remote(ds2))
ds3.count()

The error:

TypeError                                 Traceback (most recent call last)
Cell In[35], line 2
      1 ds3 = ray.get(add_dim_event_ts_12hr.remote(ds2))
----> 2 ds3.count()

File ~/penv38/lib/python3.8/site-packages/ray/data/dataset.py:2498, in Dataset.count(self)
   2492     return meta_count
   2494 get_num_rows = cached_remote_fn(_get_num_rows)
   2496 return sum(
   2497     ray.get(
-> 2498         [get_num_rows.remote(block) for block in self.get_internal_block_refs()]
   2499     )
   2500 )

File ~/penv38/lib/python3.8/site-packages/ray/data/dataset.py:4799, in Dataset.get_internal_block_refs(self)
   4780 @ConsumptionAPI(pattern="Time complexity:")
   4781 @DeveloperAPI
   4782 def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
   4783     """Get a list of references to the underlying blocks of this dataset.
   4784 
   4785     This function can be used for zero-copy access to the data. It blocks
   (...)
   4797         A list of references to this dataset's blocks.
   4798     """
-> 4799     blocks = self._plan.execute().get_blocks()
   4800     self._synchronize_progress_bar()
   4801     return blocks

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/plan.py:591, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    589 else:
    590     executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 591 blocks = execute_to_legacy_block_list(
    592     executor,
    593     self,
    594     allow_clear_input_blocks=allow_clear_input_blocks,
    595     dataset_uuid=self._dataset_uuid,
    596     preserve_order=preserve_order,
    597 )
    598 # TODO(ekl) we shouldn't need to set this in the future once we move
    599 # to a fully lazy execution model, unless .materialize() is used. Th
    600 # reason we need it right now is since the user may iterate over a
    601 # Dataset multiple times after fully executing it once.
    602 if not self._run_by_consumer:

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py:119, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    112 dag, stats = _get_execution_dag(
    113     executor,
    114     plan,
    115     allow_clear_input_blocks,
    116     preserve_order,
    117 )
    118 bundles = executor.execute(dag, initial_stats=stats)
--> 119 block_list = _bundles_to_block_list(bundles)
    120 # Set the stats UUID after execution finishes.
    121 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py:357, in _bundles_to_block_list(bundles)
    355 blocks, metadata = [], []
    356 owns_blocks = True
--> 357 for ref_bundle in bundles:
    358     if not ref_bundle.owns_blocks:
    359         owns_blocks = False

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/interfaces/executor.py:37, in OutputIterator.__next__(self)
     36 def __next__(self) -> RefBundle:
---> 37     return self.get_next()

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:129, in StreamingExecutor.execute.<locals>.StreamIterator.get_next(self, output_split_idx)
    127         raise StopIteration
    128 elif isinstance(item, Exception):
--> 129     raise item
    130 else:
    131     # Otherwise return a concrete RefBundle.
    132     if self._outer._global_info:

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:187, in StreamingExecutor.run(self)
    181 """Run the control loop in a helper thread.
    182 
    183 Results are returned via the output node's outqueue.
    184 """
    185 try:
    186     # Run scheduling loop until complete.
--> 187     while self._scheduling_loop_step(self._topology) and not self._shutdown:
    188         pass
    189 except Exception as e:
    190     # Propagate it to the result iterator.

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:256, in StreamingExecutor._scheduling_loop_step(self, topology)
    254 if DEBUG_TRACE_SCHEDULING:
    255     _debug_dump_topology(topology)
--> 256 topology[op].dispatch_next_task()
    257 cur_usage = TopologyResourceUsage.of(topology)
    258 op = select_operator_to_run(
    259     topology,
    260     cur_usage,
   (...)
    264     autoscaling_state=self._autoscaling_state,
    265 )

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor_state.py:198, in OpState.dispatch_next_task(self)
    196 for i, inqueue in enumerate(self.inqueues):
    197     if inqueue:
--> 198         self.op.add_input(inqueue.popleft(), input_index=i)
    199         return
    200 assert False, "Nothing to dispatch"

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/operators/map_operator.py:202, in MapOperator.add_input(self, refs, input_index)
    198 if self._block_ref_bundler.has_bundle():
    199     # If the bundler has a full bundle, add it to the operator's task submission
    200     # queue.
    201     bundle = self._block_ref_bundler.get_next_bundle()
--> 202     self._add_bundled_input(bundle)

File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py:48, in TaskPoolMapOperator._add_bundled_input(self, bundle)
     46 input_blocks = [block for block, _ in bundle.blocks]
     47 ctx = TaskContext(task_idx=self._next_data_task_idx)
---> 48 gen = map_task.options(
     49     **self._get_runtime_ray_remote_args(input_bundle=bundle), name=self.name
     50 ).remote(
     51     self._map_transformer_ref,
     52     DataContext.get_current(),
     53     ctx,
     54     *input_blocks,
     55 )
     56 self._submit_data_task(gen, bundle)

File ~/penv38/lib/python3.8/site-packages/ray/remote_function.py:241, in RemoteFunction.options.<locals>.FuncWrapper.remote(self, *args, **kwargs)
    240 def remote(self, *args, **kwargs):
--> 241     return func_cls._remote(args=args, kwargs=kwargs, **updated_options)

File ~/penv38/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/penv38/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py:310, in _tracing_task_invocation.<locals>._invocation_remote_span(self, args, kwargs, *_args, **_kwargs)
    308     if kwargs is not None:
    309         assert "_ray_trace_ctx" not in kwargs
--> 310     return method(self, args, kwargs, *_args, **_kwargs)
    312 assert "_ray_trace_ctx" not in kwargs
    313 tracer = _opentelemetry.trace.get_tracer(__name__)

File ~/penv38/lib/python3.8/site-packages/ray/remote_function.py:263, in RemoteFunction._remote(self, args, kwargs, **task_options)
    261 task_options.pop("max_calls", None)
    262 if client_mode_should_convert():
--> 263     return client_mode_convert_function(self, args, kwargs, **task_options)
    265 worker = ray._private.worker.global_worker
    266 worker.check_connected()

File ~/penv38/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:164, in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
    162     setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
    163 client_func = ray._get_converted(key)
--> 164 return client_func._remote(in_args, in_kwargs, **kwargs)

File ~/penv38/lib/python3.8/site-packages/ray/util/client/common.py:298, in ClientRemoteFunc._remote(self, args, kwargs, **option_args)
    296 if kwargs is None:
    297     kwargs = {}
--> 298 return self.options(**option_args).remote(*args, **kwargs)

File ~/penv38/lib/python3.8/site-packages/ray/util/client/common.py:581, in OptionWrapper.remote(self, *args, **kwargs)
    579 def remote(self, *args, **kwargs):
    580     self._remote_stub._signature.bind(*args, **kwargs)
--> 581     return return_refs(ray.call_remote(self, *args, **kwargs))

File ~/penv38/lib/python3.8/site-packages/ray/util/client/api.py:100, in _ClientAPI.call_remote(self, instance, *args, **kwargs)
     86 def call_remote(self, instance: "ClientStub", *args, **kwargs) -> List[Future]:
     87     """call_remote is called by stub objects to execute them remotely.
     88 
     89     This is used by stub objects in situations where they're called
   (...)
     98         kwargs: opaque keyword arguments
     99     """
--> 100     return self.worker.call_remote(instance, *args, **kwargs)

File ~/penv38/lib/python3.8/site-packages/ray/util/client/worker.py:562, in Worker.call_remote(self, instance, *args, **kwargs)
    560 if instance._num_returns() == "dynamic":
    561     num_returns = -1
--> 562 return self._call_schedule_for_task(task, num_returns)

File ~/penv38/lib/python3.8/site-packages/ray/util/client/worker.py:575, in Worker._call_schedule_for_task(self, task, num_returns)
    573 if num_return_refs == -1:
    574     num_return_refs = 1
--> 575 id_futures = [Future() for _ in range(num_return_refs)]
    577 def populate_ids(resp: Union[ray_client_pb2.DataResponse, Exception]) -> None:
    578     if isinstance(resp, Exception):

TypeError: 'str' object cannot be interpreted as an integer

When the above error occurred, the num_return_refs was set to “streaming

'str' object cannot be interpreted as an integer

The same error is seen for map_batches() as well

Any insight on this issue would be greatly appreciated! :smiley:

I was able to get around this issue by running the dataset creation and processing in an actor running on the ray k8s cluster