Error: Current row has different columns compared to previous rows
Looks like we get this error at random. In the following minimal reproduction, if I process only 100 rows, it usually works out. If I process 10000 rows, it usually breaks.
Hypothesis: ray data is putting multiple rows with different columns in the same block when we see the error.
Questions:
- Is the Ray Data behavior well defined when columns change across rows?
- How can we fix this? Enforce 1 row per block?
- If not, should it be? Should we note in the docs a best practice about changing columns in the results?
Minimal reproduction
import random
from ray.data import from_items
def func1(sample_row):
return {"__sample_id": 1, "sample_id": 2, "embeddings": 3}
def func2(sample_row):
return {"_exception": 1, "sample_id": 2}
class RayActor:
def __call__(self, sample_row):
if random.random() < 0.5:
return func1(sample_row)
else:
return func2(sample_row)
ids = range(10000)
ray_pipeline = from_items(ids).map(RayActor, concurrency=1)
for row in ray_pipeline.iter_rows():
print(row)
Stacktrace:
2024-08-22 07:56:02,349 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/ray/data/exceptions.py", line 49, in handle_trace
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 428, in execute_to_iterator
bundle_iter = itertools.chain([next(gen)], gen)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
bundle = self._base_iterator.get_next(output_split_idx)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
item = self._outer._output_node.get_output_blocking(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 284, in get_output_blocking
raise self._exception
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 222, in run
continue_sched = self._scheduling_loop_step(self._topology)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 277, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 457, in process_completed_tasks
raise e from None
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 424, in process_completed_tasks
bytes_read = task.on_data_ready(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
raise ex from None
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
ray.get(block_ref)
File "/usr/local/lib/python3.10/dist-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 2659, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 871, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::Map(RayActor)() (pid=18476, ip=10.244.44.13, actor_id=a25e1fb68498348b09b1600f01000000, repr=MapWorker(Map(RayActor)))
yield from _map_task(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_operator.py", line 451, in _map_task
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_transformer.py", line 393, in __call__
add_fn(data)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/output_buffer.py", line 43, in add
self._buffer.add(item)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/delegating_block_builder.py", line 38, in add
self._builder.add(item)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/table_block.py", line 72, in add
raise ValueError(
ValueError: Current row has different columns compared to previous rows. Columns of current row: ['__sample_id', 'embeddings', 'sample_id'], Columns of previous rows: ['_exception', 'sample_id'].