Dask on ray does not work with dask dataframes

HI, we have a dask dataframe and need to perform computes on them in a set of ray.remote() DataActors. These DataActors are made ourselves.

In the docs Using Dask on Ray — Ray 2.9.1 under the paragraph “Persist” we see an example on how this is handled for dask array’s:

d_arr = da.ones(100)
d_arr_p = d_arr.persist()
print(dask.base.collections_to_dsk([d_arr_p]))

Would print the following:
{(‘ones-c345e6f8436ff9bcd68ddf25287d27f3’, 0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}

Now if we do a d_arr_p.compute() the computations will be performed on the ray cluster itself. It would be possible for us to initialise all our DataActors like so: DataActor.remote(data=d_arr_p) without all DataActors making copies of the array.

Now we thought it would have the same functionality for dask dataframes. Lets assume this is our df:

#Enable dask on ray scheduler
enable_dask_on_ray()

#Create random dask df
df = dd.from_pandas(
pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=[“age”, “grade”]),
npartitions=2,
)

print(dask.base.collections_to_dsk([df])) will print a normal dask object

Now if we do df = df.persist() we do not get an object ref.

Also, if after persisiting we initialise our DataActors with DataActor.remote(data=df), copies are made of the df and thus increasing memory usage significantly. Furthermore, the computations are not done on one shared object like with the array. We expect that it would be one shared object in storage where all workers can compute on.

It seems that enable_dask_on_ray() does not work on dask df’s and only on arrays?

1 Like