How severe does this issue affect your experience of using Ray?
- Low: It annoys or frustrates me for a moment.
I am using the following functions to pass data between dask to ray frameworks, and wondering if it is efficient in terms of computation/memory ?
import numpy as np
import pandas as pd
import ray
from ray.util.dask import enable_dask_on_ray, ray_dask_get
import dask
from dask import array as da, dataframe as dd
ray.init()
enable_dask_on_ray()
def raydata_from_dask_array(arr: da.Array):
# rechunk to maximize the number of rows for all columns (dask optimized)
arr = arr.rechunk(chunks=(dask.config.get("array.chunk-size"), *arr.shape[1:]))
gen_ndarray = (part.compute(scheduler=ray_dask_get) for part in arr.partitions)
return ray.data.from_numpy(gen_ndarray)
def raydata_from_dask_dataframe(ddf: dd.DataFrame):
gen_dataframe = (part.compute(scheduler=ray_dask_get) for part in ddf.partitions)
return ray.data.from_pandas(gen_dataframe)
def main():
arr = da.from_array(np.ones(shape=(10000, 1000))
, chunks=(2500, 10))
ddf = dd.from_pandas(pd.DataFrame(np.ones(shape=(10000, 1000)), columns=(np.arange(0, 1000) / 1000).astype(str))
, npartitions=10)
ray_arr = raydata_from_dask_array(arr)
ray_df = raydata_from_dask_dataframe(ddf)
print(arr[:10].compute(), '\n\n', ray_arr, '\n\n\n\n', ddf.head(10), '\n\n', ray_df)
main()
Running in local_mode=True
seems to show that parts are computed and loaded in ray in parallel but is there a risk that I don’t see ?
Also the functions ray.data.from_numpy
and ray.data.from_pandas
do not accept generators as input, which raise a type error.