Model outputs variable length data

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Context:
In my pipeline I have an image, I break the image up into tiles and then perform inference on each tile. The model outputs multiple polygons for each tile. Each polygon also has an integer class id. In each tile, the number of polygons is variable and the number of coordinates in each polygon is also variable. After I finish the inference and get the polygon and class id data I do a groupby over all the tiles to get the polygon and class id data for each image.

Current solution:
Since each polygon has a class id I created a python dataclass:
@dataclass
PolygonData:
class_id: int
polygon_coords: np.array

Then the inference result would be a list of polygon data:
@dataclass
InferenceResult:
inference_result: List[PolygonData]

row[“inference_result”] = InferenceResult(…)

Lists came to mind because there is a variable number of polygons for each tile/inference result. I had to make a dataclass to wrap the list because ray didn’t like it when my column value was a list.

When I stitch the tiles/do the groupby, I combine all of the InferenceResult’s for all the tiles.

My code works but it uses more memory and is slow, I get the warning from ray “Falling back to using pandas block type, which is slower and consumes more memory.”

Alternative solution:
I thought about creating separate numpy arrays for the class_ids and polygon coords for each tile:
row[“class_ids”] = np.array(…)
row[“polygons”] = np.array(…)
but then I need to pad each polygon array since they’re variable lengths. I’d have to remove the pad and then repad it again in a future post-processing step. This seems more tedious and error prone.

I’m wondering if using padding with numpy arrays is the recommended approach here (since numpy arrays seem to be encouraged by ray) or if you have better alternative approach(s) (other data structures/data types) for dealing with variable length model outputs

This is related to another post I made: Ray Column With Custom Python Dataclass Type

Hey @kaylahardie , I’m not sure if padding is the right approach here.

Sounds like you have lists of varying length in each row? Is it all similar to what we do for object detection? Object Detection Batch Inference with PyTorch — Ray 2.11.0

I had to make a dataclass to wrap the list because ray didn’t like it when my column value was a list.

What did you see in STDOUT?

The model output that I’m storing in a ray dataset column will be a list that varies in length and the elements in the list will be numpy arrays of varying lengths.

For example, one row value for the model output column may look like:

[np.array([[1,2], [3, 4]]), np.array([[5,2], [3, 2], [1,2]])]

Another row value for the model output column may look like:

[np.array([[1,2], [0, 9]])]

Looking at the latest ray release - I was running into [data] fix nested ragged ndarray by raulchen · Pull Request #44236 · ray-project/ray · GitHub bug before. I thought the problem was the lists because when I wrapped the list in a dataclass my code worked.

I’m still having issues though.
I tried setting the column to be like some of the examples I listed above and then somehow when I do a groupby it converts the column to be in a pandas dataframe format and the list gets converted to an nd numpy array and then pandas complains:

2024-04-24 17:59:46,064 ERROR streaming_executor_state.py:446 -- An exception was raised from a task of operator "Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-04-24 17:59:46,072 WARNING actor_pool_map_operator.py:292 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
2024-04-24 17:59:46,073 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/home/kaylahardie/MAPLE_v3/ray_maple_workflow_flat.py", line 110, in <module>
    data_per_image.schema())
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/dataset.py", line 2540, in schema
    base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 387, in schema
    self.execute()
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/exceptions.py", line 86, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ValueError): ray::Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)() (pid=2933112, ip=192.168.36.107, actor_id=06e8174ddd5f71f1cade3e6401000000, repr=MapWorker(Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)))
    yield from _map_task(
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 419, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 395, in __call__
    yield output_buffer.next()
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py", line 73, in next
    block_to_yield = self._buffer.build()
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 64, in build
    return self._builder.build()
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 133, in build
    return self._concat_tables(tables)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 128, in _concat_tables
    df = pandas.concat(tables, ignore_index=True)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 393, in concat
    return op.get_result()
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 680, in get_result
    new_data = concatenate_managers(
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/concat.py", line 180, in concatenate_managers
    values = concat_compat(vals, axis=0, ea_compat_axis=True)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/dtypes/concat.py", line 83, in concat_compat
    return obj._concat_same_type(to_concat_eas)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/air/util/tensor_extensions/pandas.py", line 1102, in _concat_same_type
    np.array([e for a in to_concat for e in a._tensor], dtype=object)
ValueError: could not broadcast input array from shape (200,107,3) into shape (200,)

I noticed in a toy script that ray doesn’t like it when the column values in the ray dataset are nd numpy arrays yet I use nd numpy arrays in the pipeline that I built and it doesn’t error (I saw tile_values numpy.ndarray(shape=(200, 200, 3), dtype=uint8) in the schema for example.

I made a toy script to try to identify the problem more:

def add_list(row):
    row["sample_list"] = [np.array([[1,3], [1, 4]]), np.array([[5,1], [3,1], [4, 2]])]
    return row

def test_groups_identity(group):
    return group

if __name__ == "__main__":
    ds = ray.data.from_items([ 
    {"group": 1, "value": "cat"},
    {"group": 1, "value": "dog"}, 
    {"group": 2, "value": "mouse"},
    {"group": 2, "value": "giraffe"}])
    print("ds schema: ", ds.schema(), ds.count(), ds.take_all())

    ds = ds.map(add_list)
    print("ds schema with list: ", ds.schema(), ds.count(), ds.take_all())
    result = ds.groupby("group").map_groups(test_groups_identity)
    print("group by result: ", result.schema())

When I don’t have the groupby line, the toy script doesn’t show any errors but when I do have the groupby line, it does show a warning (with how ray is using pandas and how it mentions the numpy type 17 again):

(MapBatches(group_fn) pid=3091121) Could not construct Arrow block from numpy array; encountered values of unsupported numpy type `17` in column named 'sample_list', which cannot be casted to an Arrow data type. Falling back to using pandas block type, which is slower and consumes more memory. For maximum performance, consider applying the following suggestions before ingesting into Ray Data in order to use native Arrow block types:
(MapBatches(group_fn) pid=3091121) - Expand out each key-value pair in the dict column into its own column
(MapBatches(group_fn) pid=3091121) - Replace `None` values with an Arrow supported data type

Also here is an error from the toy script when I try to add a nd numpy array column:

ray::MapBatches(group_fn)() (pid=3095735, ip=192.168.36.107)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 125, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/grouped_data.py", line 205, in group_fn
    block = BlockAccessor.batch_to_block(batch)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/block.py", line 376, in batch_to_block
    return pd.DataFrame(dict(batch))
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/frame.py", line 733, in __init__
    mgr = dict_to_mgr(data, index, columns, dtype=dtype, copy=copy, typ=manager)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 503, in dict_to_mgr
    return arrays_to_mgr(arrays, columns, index, dtype=dtype, typ=typ, consolidate=copy)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 114, in arrays_to_mgr
    index = _extract_index(arrays)
  File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 664, in _extract_index
    raise ValueError("Per-column arrays must each be 1-dimensional")
ValueError: Per-column arrays must each be 1-dimensional

Basically I’m confused how the typing of the ray dataset columns work, especially with the groupby.

I’m not sure why my list of arrays gets converted to a nd numpy array and then pandas errors. I’m also not sure why my dataset columns show a type of a nd numpy array and it works but other times it errors.