Groupby with bigdata

Hi, I am trying to process a big dataset, which includes ~ 0.3 billions of lines. each line is with ‘thread_id’ and ‘item’; One thing I need to do is groupby the lines with ‘id’, and then merge each group to a list of items.

  1. I wonder whether ray can do this by parallelism on multiply nodes.
    in the document it says: “This operation requires all inputs to be materialized in object store for it to execute.” Does it mean all data must be in the memory of one node (just like map_group), or it can be on all the nodes of a ray cluster?

  2. I am trying to groupby it and aggregate:
    2.1 first I get combined_ds:
    combined_ds.schema()

Column Type


thread_id string
item struct<author: string, body: string, …

then:
build_thread = AggregateFn(
init=lambda state: ,
accumulate_row=lambda state, row: state.append(row),
merge=lambda state, other: state.extend(other),
name=‘thread’,
)
aggregated_ds = combined_ds.groupby(“thread_id”).aggregate(build_thread)
but I got:
parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py”, line 57, in
parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py”, line 522, in combine
accumulators[i] = aggs[i].accumulate_block(accumulators[i], group_view)
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/aggregate/_aggregate.py”, line 68, in accumulate_block
a = accumulate_row(a, r)
File “/tmp/ipykernel_3535721/1427724256.py”, line 3, in
AttributeError: ‘NoneType’ object has no attribute ‘append’

would anyone give any suggestions?

many thanks!