Converting dask dataframe/array to ray dataset

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

I am using the following functions to pass data between dask to ray frameworks, and wondering if it is efficient in terms of computation/memory ?

import numpy as np
import pandas as pd
import ray
from ray.util.dask import enable_dask_on_ray, ray_dask_get
import dask
from dask import array as da, dataframe as dd

ray.init()
enable_dask_on_ray()


def raydata_from_dask_array(arr: da.Array):
    # rechunk to maximize the number of rows for all columns (dask optimized)
    arr = arr.rechunk(chunks=(dask.config.get("array.chunk-size"), *arr.shape[1:]))
    gen_ndarray = (part.compute(scheduler=ray_dask_get) for part in arr.partitions)
    return ray.data.from_numpy(gen_ndarray)


def raydata_from_dask_dataframe(ddf: dd.DataFrame):
    gen_dataframe = (part.compute(scheduler=ray_dask_get) for part in ddf.partitions)
    return ray.data.from_pandas(gen_dataframe)


def main():
    arr = da.from_array(np.ones(shape=(10000, 1000))
                        , chunks=(2500, 10))
    ddf = dd.from_pandas(pd.DataFrame(np.ones(shape=(10000, 1000)), columns=(np.arange(0, 1000) / 1000).astype(str))
                         , npartitions=10)

    ray_arr = raydata_from_dask_array(arr)
    ray_df = raydata_from_dask_dataframe(ddf)

    print(arr[:10].compute(), '\n\n', ray_arr, '\n\n\n\n', ddf.head(10), '\n\n', ray_df)


main()

Running in local_mode=True seems to show that parts are computed and loaded in ray in parallel but is there a risk that I don’t see ?

Also the functions ray.data.from_numpy and ray.data.from_pandas do not accept generators as input, which raise a type error.

Thank you for posting, @neltacigreb – and welcome to the Ray Discourse community!

Am tagging in @Clark_Zinzow, who has been driving ray.data efforts. Have you tried using Ray’s Dask on Ray integration, by any chance?

Yes @dynamicwebpaige I’m running a dask pipeline using dask_on_ray, and this question is more related to how to convert a dataframe between the 2 frameworkds.

Hi @neltacigreb! Ray Datasets actually has a first-class integration with Dask-on-Ray, where Datasets can take a Dask DataFrame and convert it to a Dataset without moving or copying the data!

ds = ray.data.from_dask(ddf)

In your case, your full example would look like:

import numpy as np
import pandas as pd
import ray
from ray.util.dask import enable_dask_on_ray
from dask import array as da, dataframe as dd

ray.init()
enable_dask_on_ray()


def main():
    arr = da.from_array(np.ones(shape=(10000, 1000))
                        , chunks=(2500, 10))
    ddf = dd.from_pandas(pd.DataFrame(np.ones(shape=(10000, 1000)), columns=(np.arange(0, 1000) / 1000).astype(str))
                         , npartitions=10)

    ray_arr = ray.data.from_dask(ddf)
    ray_df = ray_arr.to_dask()

    print(arr[:10].compute(), '\n\n', ray_arr, '\n\n\n\n', ddf.head(10), '\n\n', ray_df)


main()