Hi all,
I want to apply f(key, group) on all groups of a dataset that was grouped by key.
I tried to do it using Ray GroupedDataset, and aggregate and map_group methods, but I can not make it work.
I am also trying to use map batches on a classic Ray Dataset created from a groupby Panda Dataframe. Here is some simple code to explain my difficulties.
I have some imports and a function I want to map.
from modin.db_conn import ModinDatabaseConnection
from concurrent.futures import ThreadPoolExecutor
import modin
import pandas
import raydef fn(groupby_data):
something, data = groupby_data
do_something(…)
When I do it without ray, it is working :
pd_table = pandas.read_sql(query, ModinDatabaseConnection(…))
pd_table.groupby(‘caseid’)
with ThreadPoolExecutor() as executor:
list(executor.map(fn, pd_table))
But when I try to do it without Ray, like this it does not:
modin_pd_table = modin.pandas.read_sql(query, ModinDatabaseConnection(…))
modin_pd_table_grouped = modin_pd_table.groupby(‘something’)
ray_ds = ray.data.from_modin(modin_pd_table_grouped)
ray_ds.map_batches(fn, batch_size=None, compute=‘tasks’)
The error is quite simple :
something, data = groupby_data
ValueError: too many values to unpack (expected 2)
Any help would be appreciated