Hello,
I’m seeing an error while trying to add a column to ray dataset created by reading parquet files on gcs
Env:
Ray: 2.7.1 running on GKE
@ray.remote
def add_dim_event_ts_12hr(ds):
return ds.add_column("dim_event_ts_12hr", compute_dim_event_ts_12hr)
ds2 = ds.repartition(10)
ds3 = ray.get(add_dim_event_ts_12hr.remote(ds2))
ds3.count()
The error:
TypeError Traceback (most recent call last)
Cell In[35], line 2
1 ds3 = ray.get(add_dim_event_ts_12hr.remote(ds2))
----> 2 ds3.count()
File ~/penv38/lib/python3.8/site-packages/ray/data/dataset.py:2498, in Dataset.count(self)
2492 return meta_count
2494 get_num_rows = cached_remote_fn(_get_num_rows)
2496 return sum(
2497 ray.get(
-> 2498 [get_num_rows.remote(block) for block in self.get_internal_block_refs()]
2499 )
2500 )
File ~/penv38/lib/python3.8/site-packages/ray/data/dataset.py:4799, in Dataset.get_internal_block_refs(self)
4780 @ConsumptionAPI(pattern="Time complexity:")
4781 @DeveloperAPI
4782 def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
4783 """Get a list of references to the underlying blocks of this dataset.
4784
4785 This function can be used for zero-copy access to the data. It blocks
(...)
4797 A list of references to this dataset's blocks.
4798 """
-> 4799 blocks = self._plan.execute().get_blocks()
4800 self._synchronize_progress_bar()
4801 return blocks
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/plan.py:591, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
589 else:
590 executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 591 blocks = execute_to_legacy_block_list(
592 executor,
593 self,
594 allow_clear_input_blocks=allow_clear_input_blocks,
595 dataset_uuid=self._dataset_uuid,
596 preserve_order=preserve_order,
597 )
598 # TODO(ekl) we shouldn't need to set this in the future once we move
599 # to a fully lazy execution model, unless .materialize() is used. Th
600 # reason we need it right now is since the user may iterate over a
601 # Dataset multiple times after fully executing it once.
602 if not self._run_by_consumer:
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py:119, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
112 dag, stats = _get_execution_dag(
113 executor,
114 plan,
115 allow_clear_input_blocks,
116 preserve_order,
117 )
118 bundles = executor.execute(dag, initial_stats=stats)
--> 119 block_list = _bundles_to_block_list(bundles)
120 # Set the stats UUID after execution finishes.
121 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/legacy_compat.py:357, in _bundles_to_block_list(bundles)
355 blocks, metadata = [], []
356 owns_blocks = True
--> 357 for ref_bundle in bundles:
358 if not ref_bundle.owns_blocks:
359 owns_blocks = False
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/interfaces/executor.py:37, in OutputIterator.__next__(self)
36 def __next__(self) -> RefBundle:
---> 37 return self.get_next()
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:129, in StreamingExecutor.execute.<locals>.StreamIterator.get_next(self, output_split_idx)
127 raise StopIteration
128 elif isinstance(item, Exception):
--> 129 raise item
130 else:
131 # Otherwise return a concrete RefBundle.
132 if self._outer._global_info:
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:187, in StreamingExecutor.run(self)
181 """Run the control loop in a helper thread.
182
183 Results are returned via the output node's outqueue.
184 """
185 try:
186 # Run scheduling loop until complete.
--> 187 while self._scheduling_loop_step(self._topology) and not self._shutdown:
188 pass
189 except Exception as e:
190 # Propagate it to the result iterator.
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor.py:256, in StreamingExecutor._scheduling_loop_step(self, topology)
254 if DEBUG_TRACE_SCHEDULING:
255 _debug_dump_topology(topology)
--> 256 topology[op].dispatch_next_task()
257 cur_usage = TopologyResourceUsage.of(topology)
258 op = select_operator_to_run(
259 topology,
260 cur_usage,
(...)
264 autoscaling_state=self._autoscaling_state,
265 )
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/streaming_executor_state.py:198, in OpState.dispatch_next_task(self)
196 for i, inqueue in enumerate(self.inqueues):
197 if inqueue:
--> 198 self.op.add_input(inqueue.popleft(), input_index=i)
199 return
200 assert False, "Nothing to dispatch"
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/operators/map_operator.py:202, in MapOperator.add_input(self, refs, input_index)
198 if self._block_ref_bundler.has_bundle():
199 # If the bundler has a full bundle, add it to the operator's task submission
200 # queue.
201 bundle = self._block_ref_bundler.get_next_bundle()
--> 202 self._add_bundled_input(bundle)
File ~/penv38/lib/python3.8/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py:48, in TaskPoolMapOperator._add_bundled_input(self, bundle)
46 input_blocks = [block for block, _ in bundle.blocks]
47 ctx = TaskContext(task_idx=self._next_data_task_idx)
---> 48 gen = map_task.options(
49 **self._get_runtime_ray_remote_args(input_bundle=bundle), name=self.name
50 ).remote(
51 self._map_transformer_ref,
52 DataContext.get_current(),
53 ctx,
54 *input_blocks,
55 )
56 self._submit_data_task(gen, bundle)
File ~/penv38/lib/python3.8/site-packages/ray/remote_function.py:241, in RemoteFunction.options.<locals>.FuncWrapper.remote(self, *args, **kwargs)
240 def remote(self, *args, **kwargs):
--> 241 return func_cls._remote(args=args, kwargs=kwargs, **updated_options)
File ~/penv38/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
21 @wraps(fn)
22 def auto_init_wrapper(*args, **kwargs):
23 auto_init_ray()
---> 24 return fn(*args, **kwargs)
File ~/penv38/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py:310, in _tracing_task_invocation.<locals>._invocation_remote_span(self, args, kwargs, *_args, **_kwargs)
308 if kwargs is not None:
309 assert "_ray_trace_ctx" not in kwargs
--> 310 return method(self, args, kwargs, *_args, **_kwargs)
312 assert "_ray_trace_ctx" not in kwargs
313 tracer = _opentelemetry.trace.get_tracer(__name__)
File ~/penv38/lib/python3.8/site-packages/ray/remote_function.py:263, in RemoteFunction._remote(self, args, kwargs, **task_options)
261 task_options.pop("max_calls", None)
262 if client_mode_should_convert():
--> 263 return client_mode_convert_function(self, args, kwargs, **task_options)
265 worker = ray._private.worker.global_worker
266 worker.check_connected()
File ~/penv38/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:164, in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
162 setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
163 client_func = ray._get_converted(key)
--> 164 return client_func._remote(in_args, in_kwargs, **kwargs)
File ~/penv38/lib/python3.8/site-packages/ray/util/client/common.py:298, in ClientRemoteFunc._remote(self, args, kwargs, **option_args)
296 if kwargs is None:
297 kwargs = {}
--> 298 return self.options(**option_args).remote(*args, **kwargs)
File ~/penv38/lib/python3.8/site-packages/ray/util/client/common.py:581, in OptionWrapper.remote(self, *args, **kwargs)
579 def remote(self, *args, **kwargs):
580 self._remote_stub._signature.bind(*args, **kwargs)
--> 581 return return_refs(ray.call_remote(self, *args, **kwargs))
File ~/penv38/lib/python3.8/site-packages/ray/util/client/api.py:100, in _ClientAPI.call_remote(self, instance, *args, **kwargs)
86 def call_remote(self, instance: "ClientStub", *args, **kwargs) -> List[Future]:
87 """call_remote is called by stub objects to execute them remotely.
88
89 This is used by stub objects in situations where they're called
(...)
98 kwargs: opaque keyword arguments
99 """
--> 100 return self.worker.call_remote(instance, *args, **kwargs)
File ~/penv38/lib/python3.8/site-packages/ray/util/client/worker.py:562, in Worker.call_remote(self, instance, *args, **kwargs)
560 if instance._num_returns() == "dynamic":
561 num_returns = -1
--> 562 return self._call_schedule_for_task(task, num_returns)
File ~/penv38/lib/python3.8/site-packages/ray/util/client/worker.py:575, in Worker._call_schedule_for_task(self, task, num_returns)
573 if num_return_refs == -1:
574 num_return_refs = 1
--> 575 id_futures = [Future() for _ in range(num_return_refs)]
577 def populate_ids(resp: Union[ray_client_pb2.DataResponse, Exception]) -> None:
578 if isinstance(resp, Exception):
TypeError: 'str' object cannot be interpreted as an integer
When the above error occurred, the num_return_refs was set to “streaming”
'str' object cannot be interpreted as an integer
The same error is seen for map_batches() as well
Any insight on this issue would be greatly appreciated!