How to create a Ray dataset from distributed partitions?

I have a use case where I have to load a large amount of CSV data/partitions (100s of GB) from different set of nodes to Ray.
It seems that Ray distributed data load only supports limited use cases, such as storing the partitions in S3 buckets.
In my case I have the partitions stored on the filesystem of a collection of nodes (a custom Ray cluster), where the nodes can ssh into each other.
One suboptimal way of resolving this is to copy all partitions into the head node beforehand and then load all the partitions from a single node. However, this will create a huge memory footprint and runtime bottleneck on a single node. (Please view the last paragraph below for more details)

I have thought of the following alternatives, but not sure if this is achievable with Ray:

  1. I can create pandas dataframes from each partition, then call ray.put(partition_df) on each node so that each node can put their partition into the distributed object store, and finally somehow create a Ray dataset using ray.data.from_pandas_refs(object_ref_list). The only challenge here is to obtain the object_refs of each partition. This is currently not feasible since object_refs are created separately on each node, so I don’t have access to them. I only can view the object_ids, so Is there a way to create an ObjectRef from Object IDs?

  2. Somehow creating a NFS beforehand and then provide the address of each partition to ray.data.read_csv(address_list). However, currently this had too much hassle and I prefer to find a workaround within Ray.

As a follow-up question, regarding ray.put(), is there a way to specify which worker should ray.put() send the data to? This is an important feature to have because currently, if I call ray.put() from a single head node, it creates all the objects into head node’s object store, and with large data I have seen that after a while the head node runs out of memory and starts spilling to disk.
Ideally, Ray should automatically avoid this issue by monitoring how much memory is left on the node so that subsequent ray.put() calls send the partitions into other nodes. However, this is not what I observed from my experiments

cc: @jjyao @Chen_Shen any suggestions for this?

ray.put() currently can only put to the local node.

I think alternative 1 should work:

you can define a load_partition task and run it on each node via NodeAffinitySchedulingStrategy

@ray.remote
def load_partition():
    df = read_pandas_dataframe(local_path)
    return df # you return df directly and driver will receive the object_ref, you don't need to call ray.put

As @jjyao suggestion, you can find out how to use nodeAffinityScheduling here.

Hi @jjyao Thanks for a response. @Jules_Damji Thanks for the pointer.
@jjyao Regarding your suggestion, I suppose the following code snippet should do the job?

@ray.remote
def load_partition():
    df = read_pandas_dataframe(local_path)
    return df
load_partition.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().node_id,
        soft=False,
    )
).remote()

However, ray.get_runtime_context().node_id gives only the nodeID object of the current node.
How can I obtain NodeID objects of all other nodes from Ray cluster?
I tried creating NodeID(node_id), using node_id string shown using “ray_status”, however I am getting the following error:
ValueError: ID string needs to have length 28, got 56

Any pointers on this?

You can all ray.nodes() which returns all the nodes of the cluster. Each node has a field called NodeID.

Curious if we can just use Ray data? cc @sjl

@jjyao Thanks for the help, I can confirm that your suggested approach works as expected, i.e., I can create pandas dataframes on each node’s memory and then create one distributed Ray dataset using ray.data.from_pandas_refs(), with partitions scattered across nodes.

1 Like