Apply function to (groupkey, groupvalue) of grouped by dataset

Hi all,

I want to apply f(key, group) on all groups of a dataset that was grouped by key.
I tried to do it using Ray GroupedDataset, and aggregate and map_group methods, but I can not make it work.

I am also trying to use map batches on a classic Ray Dataset created from a groupby Panda Dataframe. Here is some simple code to explain my difficulties.

I have some imports and a function I want to map.

from modin.db_conn import ModinDatabaseConnection
from concurrent.futures import ThreadPoolExecutor
import modin
import pandas
import ray

def fn(groupby_data):
something, data = groupby_data
do_something(…)

When I do it without ray, it is working :

pd_table = pandas.read_sql(query, ModinDatabaseConnection(…))
pd_table.groupby(‘caseid’)
with ThreadPoolExecutor() as executor:
list(executor.map(fn, pd_table))

But when I try to do it without Ray, like this it does not:

modin_pd_table = modin.pandas.read_sql(query, ModinDatabaseConnection(…))
modin_pd_table_grouped = modin_pd_table.groupby(‘something’)
ray_ds = ray.data.from_modin(modin_pd_table_grouped)
ray_ds.map_batches(fn, batch_size=None, compute=‘tasks’)

The error is quite simple :

something, data = groupby_data
ValueError: too many values to unpack (expected 2)

Any help would be appreciated :slight_smile:

Hi @lcaquot, could you try doing groupby and map_groups both inside Ray?

modin_pd_table = modin.pandas.read_sql(query, ModinDatabaseConnection(…))
ray_ds = ray.data.from_modin(modin_pd_table)
ray_ds.groupby(‘something’).map_groups(fn)