As title. Code below
import dask.dataframe as dd
@dask.delayed
def ref_to_df(ref: ObjectRef) -> pd.DataFrame:
return ray.get(ref)
refs = [ObjectRef(...), ...]
with enable_dask_on_ray():
ddf = dd.from_delayed([ref_to_df(ref) for ref in refs])
ddf.compute()
gives me:
ValueError: 'object_refs' must either be an object ref or a list of object refs
I guess dask delayed function automatically dereferences ObjectRefs? If so, I try passing ref directly:
@dask.delayed
def ref_to_df(ref):
return ref
and I get:
ray.exceptions.RaySystemError: System error: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/usr/local/lib/python3.9/site-packages/pandas/_libs/internals.cpython-39-darwin.so'>
If I use Datasets:
dataset = ray.data.from_pandas_refs(refs)
with enable_dask_on_ray():
ddf = dataset.to_dask()
ddf.compute()
I get the same error:
ray.exceptions.RaySystemError: System error: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/usr/local/lib/python3.9/site-packages/pandas/_libs/internals.cpython-39-darwin.so'>
How do I construct dask dataframe from objects already stored in Ray?
Thanks!