Hi, I am trying to process a big dataset, which includes ~ 0.3 billions of lines. each line is with ‘thread_id’ and ‘item’; One thing I need to do is groupby the lines with ‘id’, and then merge each group to a list of items.
-
I wonder whether ray can do this by parallelism on multiply nodes.
in the document it says: “This operation requires all inputs to be materialized in object store for it to execute.” Does it mean all data must be in the memory of one node (just like map_group), or it can be on all the nodes of a ray cluster?
-
I am trying to groupby it and aggregate:
2.1 first I get combined_ds:
combined_ds.schema()
Column Type
thread_id string
item struct<author: string, body: string, …
then:
build_thread = AggregateFn(
init=lambda state: ,
accumulate_row=lambda state, row: state.append(row),
merge=lambda state, other: state.extend(other),
name=‘thread’,
)
aggregated_ds = combined_ds.groupby(“thread_id”).aggregate(build_thread)
but I got:
parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py”, line 57, in
parts = [BlockAccessor.for_block(p).combine(key, aggs) for p in partitions]
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py”, line 522, in combine
accumulators[i] = aggs[i].accumulate_block(accumulators[i], group_view)
File “/data3/jianxin/work/libs/miniconda3/envs/rayPy39/lib/python3.9/site-packages/ray/data/aggregate/_aggregate.py”, line 68, in accumulate_block
a = accumulate_row(a, r)
File “/tmp/ipykernel_3535721/1427724256.py”, line 3, in
AttributeError: ‘NoneType’ object has no attribute ‘append’
would anyone give any suggestions?
many thanks!
Hello, I am having a similar issue but with a different error since I am not using append.
The error seems to be linked to the same root cause.
Also for some reasons the pice of code that throws this error does not throw the error all the time on my data, it works around 50% of the time on my side.
Any help is deeply appreciated
> LimitOperator[limit=20]
2024-07-31 16:25:34,863 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/TEPI1001/PycharmProjects/test-sentiment/src/test_ray.py", line 136, in <module>
test_groupby()
File "/Users/TEPI1001/PycharmProjects/test-sentiment/src/test_ray.py", line 131, in test_groupby
print(grouped_ds.take())
^^^^^^^^^^^^^^^^^
File "/Users/TEPI1001/PycharmProjects/test-sentiment/venv/lib/python3.11/site-packages/ray/data/dataset.py", line 2377, in take
for row in limited_ds.iter_rows():
File "/Users/TEPI1001/PycharmProjects/test-sentiment/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 245, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/TEPI1001/PycharmProjects/test-sentiment/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 162, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/TEPI1001/PycharmProjects/test-sentiment/venv/lib/python3.11/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/TEPI1001/PycharmProjects/test-sentiment/venv/lib/python3.11/site-packages/ray/data/exceptions.py", line 86, in handle_trace
raise e.with_traceback(None) from SystemException()
TypeError: '<' not supported between instances of 'NoneType' and 'str'```