How to import package and initialize in ray?

Hi, I’m using Ray with pandarallel to process multiple data.

Here’s the example:

import math
import ray
import numpy as np
import pandas as pd
from pandarallel import pandarallel

def calc(x):
    # import math
    return math.sin(x.a**2) + math.sin(x.b**2)

@ray.remote
def func(df):
    # pandarallel.initialize(nb_workers=2)
    return df.parallel_apply(calc, axis=1)

def main():
    df_size = int(5e3)
    df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                        b=np.random.rand(df_size)))
    df_list = [df, df]
    futures = [func.remote(df) for df in df_list]
    res = ray.get(futures)

    print(res)


if __name__ == '__main__':
    ray.init(num_cpus=2)
    main()

It would raise this error:

Traceback (most recent call last):
  File "/home/xin/Documents/github/check_s5p_lnox/test_pandarallel.py", line 29, in <module>
    main()
  File "/home/xin/Documents/github/check_s5p_lnox/test_pandarallel.py", line 22, in main
    res = ray.get(futures)
  File "/home/xin/miniconda3/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/xin/miniconda3/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::func() (pid=44305, ip=10.120.12.126)
  File "/home/xin/Documents/github/check_s5p_lnox/test_pandarallel.py", line 14, in func
    return df.parallel_apply(calc, axis=1)
  File "/home/xin/miniconda3/lib/python3.9/site-packages/pandas/core/generic.py", line 5487, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'parallel_apply'

After adding pandarallel.initialize(nb_workers=2) and import math in the functions, it works. But I suppose multiple initializations would take much more memory. Is it possible to initialize in the main function?

Any idea about the correct way?

Thanks!

pandarallel does some non-trivial initialization (starting a Plasma store, starting worker processes, etc.) and munging of the Pandas library, so it would need to be initialized in each Ray worker process (i.e. at the start of a task). This initialization latency could be done once by using a Ray actor and initializing pandarallel once in the actor constructor.

Also, using pandarallel with Ray might not make much sense; pandarallel starts up a Plasma store and worker processes, which Ray already has/does, so pandarallel won’t leverage Ray’s dataplane or task scheduling, and will end up allocating redundant resources, won’t share dataframe data across Ray tasks, etc… Have you tried out Dask-on-Ray or Modin? These implement parallel dataframe processing via the Pandas API, leveraging Ray’s dataplane (Plasma object store) and task scheduling. With this, you can also do such parallel processing on dataframes that are larger than a single node’s RAM, which is a big advantage.

1 Like

Thanks, I will learn how to use Dask-on-Ray or Modin. It sounds a good option.

1 Like