The model output that I’m storing in a ray dataset column will be a list that varies in length and the elements in the list will be numpy arrays of varying lengths.
For example, one row value for the model output column may look like:
[np.array([[1,2], [3, 4]]), np.array([[5,2], [3, 2], [1,2]])]
Another row value for the model output column may look like:
[np.array([[1,2], [0, 9]])]
Looking at the latest ray release - I was running into [data] fix nested ragged ndarray by raulchen · Pull Request #44236 · ray-project/ray · GitHub bug before. I thought the problem was the lists because when I wrapped the list in a dataclass my code worked.
I’m still having issues though.
I tried setting the column to be like some of the examples I listed above and then somehow when I do a groupby it converts the column to be in a pandas dataframe format and the list gets converted to an nd numpy array and then pandas complains:
2024-04-24 17:59:46,064 ERROR streaming_executor_state.py:446 -- An exception was raised from a task of operator "Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-04-24 17:59:46,072 WARNING actor_pool_map_operator.py:292 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
2024-04-24 17:59:46,073 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/home/kaylahardie/MAPLE_v3/ray_maple_workflow_flat.py", line 110, in <module>
data_per_image.schema())
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/dataset.py", line 2540, in schema
base_schema = self.limit(1)._plan.schema(fetch_if_missing=fetch_if_missing)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 387, in schema
self.execute()
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/exceptions.py", line 86, in handle_trace
raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ValueError): ray::Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)() (pid=2933112, ip=192.168.36.107, actor_id=06e8174ddd5f71f1cade3e6401000000, repr=MapWorker(Map(add_image_name)->Map(cal_water_mask)->FlatMap(tile_image)->MapBatches(drop_columns)->Map(MaskRCNNPredictor)))
yield from _map_task(
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 419, in _map_task
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 395, in __call__
yield output_buffer.next()
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py", line 73, in next
block_to_yield = self._buffer.build()
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 64, in build
return self._builder.build()
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 133, in build
return self._concat_tables(tables)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 128, in _concat_tables
df = pandas.concat(tables, ignore_index=True)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 393, in concat
return op.get_result()
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 680, in get_result
new_data = concatenate_managers(
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/concat.py", line 180, in concatenate_managers
values = concat_compat(vals, axis=0, ea_compat_axis=True)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/dtypes/concat.py", line 83, in concat_compat
return obj._concat_same_type(to_concat_eas)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/air/util/tensor_extensions/pandas.py", line 1102, in _concat_same_type
np.array([e for a in to_concat for e in a._tensor], dtype=object)
ValueError: could not broadcast input array from shape (200,107,3) into shape (200,)
I noticed in a toy script that ray doesn’t like it when the column values in the ray dataset are nd numpy arrays yet I use nd numpy arrays in the pipeline that I built and it doesn’t error (I saw tile_values numpy.ndarray(shape=(200, 200, 3), dtype=uint8)
in the schema for example.
I made a toy script to try to identify the problem more:
def add_list(row):
row["sample_list"] = [np.array([[1,3], [1, 4]]), np.array([[5,1], [3,1], [4, 2]])]
return row
def test_groups_identity(group):
return group
if __name__ == "__main__":
ds = ray.data.from_items([
{"group": 1, "value": "cat"},
{"group": 1, "value": "dog"},
{"group": 2, "value": "mouse"},
{"group": 2, "value": "giraffe"}])
print("ds schema: ", ds.schema(), ds.count(), ds.take_all())
ds = ds.map(add_list)
print("ds schema with list: ", ds.schema(), ds.count(), ds.take_all())
result = ds.groupby("group").map_groups(test_groups_identity)
print("group by result: ", result.schema())
When I don’t have the groupby line, the toy script doesn’t show any errors but when I do have the groupby line, it does show a warning (with how ray is using pandas and how it mentions the numpy type 17 again):
(MapBatches(group_fn) pid=3091121) Could not construct Arrow block from numpy array; encountered values of unsupported numpy type `17` in column named 'sample_list', which cannot be casted to an Arrow data type. Falling back to using pandas block type, which is slower and consumes more memory. For maximum performance, consider applying the following suggestions before ingesting into Ray Data in order to use native Arrow block types:
(MapBatches(group_fn) pid=3091121) - Expand out each key-value pair in the dict column into its own column
(MapBatches(group_fn) pid=3091121) - Replace `None` values with an Arrow supported data type
Also here is an error from the toy script when I try to add a nd numpy array column:
ray::MapBatches(group_fn)() (pid=3095735, ip=192.168.36.107)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 125, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/grouped_data.py", line 205, in group_fn
block = BlockAccessor.batch_to_block(batch)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/ray/data/block.py", line 376, in batch_to_block
return pd.DataFrame(dict(batch))
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/frame.py", line 733, in __init__
mgr = dict_to_mgr(data, index, columns, dtype=dtype, copy=copy, typ=manager)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 503, in dict_to_mgr
return arrays_to_mgr(arrays, columns, index, dtype=dtype, typ=typ, consolidate=copy)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 114, in arrays_to_mgr
index = _extract_index(arrays)
File "/usr/local/home/kaylahardie/miniconda3/envs/maple_py310_ray/lib/python3.10/site-packages/pandas/core/internals/construction.py", line 664, in _extract_index
raise ValueError("Per-column arrays must each be 1-dimensional")
ValueError: Per-column arrays must each be 1-dimensional
Basically I’m confused how the typing of the ray dataset columns work, especially with the groupby.
I’m not sure why my list of arrays gets converted to a nd numpy array and then pandas errors. I’m also not sure why my dataset columns show a type of a nd numpy array and it works but other times it errors.