Do all rows need to have the same keys?

Error: Current row has different columns compared to previous rows

Looks like we get this error at random. In the following minimal reproduction, if I process only 100 rows, it usually works out. If I process 10000 rows, it usually breaks.

Hypothesis: ray data is putting multiple rows with different columns in the same block when we see the error.

Questions:

  1. Is the Ray Data behavior well defined when columns change across rows?
    • How can we fix this? Enforce 1 row per block?
  2. If not, should it be? Should we note in the docs a best practice about changing columns in the results?

Minimal reproduction

import random
from ray.data import from_items

def func1(sample_row):
    return {"__sample_id": 1, "sample_id": 2, "embeddings": 3}

def func2(sample_row):
    return {"_exception": 1, "sample_id": 2}

class RayActor:
    def __call__(self, sample_row):
        if random.random() < 0.5:
            return func1(sample_row)
        else:
            return func2(sample_row)

ids = range(10000)
ray_pipeline = from_items(ids).map(RayActor, concurrency=1)
for row in ray_pipeline.iter_rows():
    print(row)

Stacktrace:

2024-08-22 07:56:02,349 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 428, in execute_to_iterator
    bundle_iter = itertools.chain([next(gen)], gen)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 284, in get_output_blocking
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 222, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor.py", line 277, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 457, in process_completed_tasks
    raise e from None
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/streaming_executor_state.py", line 424, in process_completed_tasks
    bytes_read = task.on_data_ready(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 2659, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::Map(RayActor)() (pid=18476, ip=10.244.44.13, actor_id=a25e1fb68498348b09b1600f01000000, repr=MapWorker(Map(RayActor)))
    yield from _map_task(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_operator.py", line 451, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_transformer.py", line 393, in __call__
    add_fn(data)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/output_buffer.py", line 43, in add
    self._buffer.add(item)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/delegating_block_builder.py", line 38, in add
    self._builder.add(item)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/table_block.py", line 72, in add
    raise ValueError(
ValueError: Current row has different columns compared to previous rows. Columns of current row: ['__sample_id', 'embeddings', 'sample_id'], Columns of previous rows: ['_exception', 'sample_id'].

No, all rows must have the same schema. Ray uses Arrow (or alternatively pandas by configuration) under the hood to represent blocks of data, which require that each row have the same schema.

One potential workaround is to create all columns, and rows without values in certain columns can use NULL/None as the value.

Thank you for the explanation!

Instead of requiring the same schema, could Ray merge the different schemas of the rows? This will still fail if two rows have incompatible types for the same column, but it will make the behavior more intuitive.

This isn’t something we have on our roadmap currently, but I would be happy to help guide through an open-source contribution, if you’re willing to contribute the feature!

As a workaround, you may also be able to add a MapBatches directly after the read, which takes a batch/block, and inserts a blank column for any missing keys/columns (this can be the “merge” step)

@Oblynx on the contribution, if you’re game, can you log a Github feature request and post the URL here?