I have a use case where I have to load a large amount of CSV data/partitions (100s of GB) from different set of nodes to Ray.
It seems that Ray distributed data load only supports limited use cases, such as storing the partitions in S3 buckets.
In my case I have the partitions stored on the filesystem of a collection of nodes (a custom Ray cluster), where the nodes can ssh into each other.
One suboptimal way of resolving this is to copy all partitions into the head node beforehand and then load all the partitions from a single node. However, this will create a huge memory footprint and runtime bottleneck on a single node. (Please view the last paragraph below for more details)
I have thought of the following alternatives, but not sure if this is achievable with Ray:
-
I can create pandas dataframes from each partition, then call ray.put(partition_df) on each node so that each node can put their partition into the distributed object store, and finally somehow create a Ray dataset using ray.data.from_pandas_refs(object_ref_list). The only challenge here is to obtain the object_refs of each partition. This is currently not feasible since object_refs are created separately on each node, so I don’t have access to them. I only can view the object_ids, so Is there a way to create an ObjectRef from Object IDs?
-
Somehow creating a NFS beforehand and then provide the address of each partition to ray.data.read_csv(address_list). However, currently this had too much hassle and I prefer to find a workaround within Ray.
As a follow-up question, regarding ray.put(), is there a way to specify which worker should ray.put() send the data to? This is an important feature to have because currently, if I call ray.put() from a single head node, it creates all the objects into head node’s object store, and with large data I have seen that after a while the head node runs out of memory and starts spilling to disk.
Ideally, Ray should automatically avoid this issue by monitoring how much memory is left on the node so that subsequent ray.put() calls send the partitions into other nodes. However, this is not what I observed from my experiments