Proper workflow to read local parquet file and use it on remote worker?

Hello, Ray community

I’m new to datasets, so maybe I just didn’t have some basic idea.

I’m trying to load parquet file from the local disk and use this data on remote worker in task, as it is described in docs.
Ray version is 2.2.0.

Here is the code I’m using:

import ray
ray.init("ray://REMOTE_IP:10001") 
ray.data.read_parquet("local://file.parquet")

But suddenly I get this error:

The local scheme paths local://file.parquet are not supported in Ray Client.

What am I doing wrong? Thanks in advance for any advice!

My goal is to load LARGE (larger then memory) parquet file from local disk and use it on remote node.

1 Like

Hi @skabbit , thanks for your interest and moving the discussion onto Discourse!
Have you tried removing the local:// portion from the path? I was able to read a dummy parquet file that I have on my local disk after connecting to a Ray cluster that I started locally:

import ray
ray.init("ray://127.0.0.1:10001")
ds = ray.data.read_parquet("test.parquet")
ds
>>> Dataset(num_blocks=1, num_rows=5, schema={col1: int64, col2: string})

Let me know if the above doesn’t match your intended use case. Thanks!

Well, local:// works well with the local Ray cluster, but it doesn’t work with the remote Ray cluster as I mentioned above.

Here is the description of my case:

  • I need to run task on remote cluster (this is a requirement);
  • I need to use large (larger then available memory) parquet file as training data;
  • This file is located on my local machine, but it must be used on remote node.

If I use file path without local:// ray worker just throw a error:
FileNotFoundError: ./file.parquet
And this is expected, because this file doesn’t exists on the cluster, and this is mentioned in docs as well:
If the file exists only on the local node and you run this read operation in distributed cluster, this will fail as it cannot access the file from remote node.

1 Like

Could you try running your script/code on the head node?

Yes, it doesn’t matter - head or worker.
Any remote instance causes this error.

1 Like

(btw, not related this issue, here is the doc for running ray jobs on remote clusters: Ray Jobs Overview — Ray 2.2.0)

That’s not expected: when running on head node, there is no Ray Client involved and there should be not such error related to Ray Client.

How did you run it on Head node? Note using Ray Jobs is a recommended way and it avoids Ray Client.

That’s the thing - as you may notice I don’t use any job submitting and getting this error just by running 3 lines of code:

import ray
ray.init("ray://REMOTE_IP:10001") 
ray.data.read_parquet("local://file.parquet")

This is exactly why it failed: running these 3 lines of code will be using Ray Client (not Head node), and local:// scheme is not supported for Ray Client.

And the suggestion here is using Ray Jobs to submit this script to the cluster that you have. Note if you do this, in your script you may modify the second line to just ray.init() (using the “ray://” will use Ray Client) .
You may check the how to submit this script to your cluster with example here: Quickstart Using the Ray Jobs CLI — Ray 2.2.0

I think I tried something similar without success :confused:

On a workstation (192.168.188.64) I started ray with ray start --head --dashboard-host=0.0.0.0.

On my notebook I have the following script FileOnCluster.py:

import ray

@ray.remote
def load_parquet_sample():
    ds = ray.data.read_parquet("local://../../../DockerProjects/minio/data/test-bucket2/data")
    return ds.show(0)


if __name__ == "__main__":

    ray.init()
    print(ray.get(load_parquet_sample.remote()))

I submit it as job from notebook to ray on the workstation with ray job submit --address=http://192.168.188.64:8265/ --working-dir ./ -- python FileOnCluster.py.

Unfortunately it fails with FileNotFoundError. Is there anything I did wrong?

What was the solution that you ultimately arrived at? I am experiencing a similar issue and it appears that the only viable solution is to transfer all local files to S3. Perhaps another option could be to use “ray rsync_up” to transfer the dataset to the cluster. However, I am unsure if this method is suitable.

@Dmitry_Balabka If you have a large dataset of parquet or CVS files, it’s advisable to have on an S3 storage. That way, individual remote workers can access it, especially if you doing distributed training and sharding the data for each trainer.

If the dataset is small enough, there are couple of options. You can read the data locally, and put the ray data in the object store, and send a object_ref to the remote workers.

import ray

@ray.remote
def process_my_dataobject_ref):
    ds = ray.get(objec_ref)
    # process the data
    
   return <some_value>


if __name__ == "__main__":

    ray.init()
    ds = ray.data.read_parquet("<path_to_parquet_file">)
    ds_object_ref = ray.put(ds)
    results = process_my_data.remote(ds_object_ref)
    print(ray.get(results))

Another option is to make the data file are part of your runtime_env.

@Jules_Damji, this is my first time using Ray. I’m attempting to convert a PyTorch training script to function in a distributed manner on a Ray Cluster. I aim to keep code alterations to a minimum during the transition from a single laptop to a Ray cluster.

I tried to follow the example for the Transformers and PyTorch libraries:
https://docs.ray.io/en/latest/train/examples/transformers/transformers_example.html
https://docs.ray.io/en/latest/train/examples/pytorch/torch_fashion_mnist_example.html
At first glance, only the training part needs to use a ray.train.torch.torch_trainer.TorchTrainer and wrap around DataLoaders with ray.train.torch.prepare_data_loader.

However, regarding the dataset, it has become somewhat complex. The dataset is stored on a local disk and composed of multiple audio files and a metadata file, taking up a few gigabytes of disk space, which is not too excessive.

A naive approach to running the script as it is leads to an error of missing files. So, I tried to rsync files directly to nodes using file_mounts in ray_cluster.yaml:

file_mounts:
    ~/src/: src/
    # The following line rsyncs all files to nodes.
    ~/data/: data/
    ~/poetry.lock: poetry.lock
    ~/poetry.toml: poetry.toml
    ~/pyproject.toml: pyproject.toml
    ~/requirements.txt: requirements.txt

I haven’t succeeded with this, but it still seemed incorrect to me.

Another option that you suggested is to use Object Storage. However, it might have performance issues since it requires processing numerous object references. Keeping the dataset in the Object Storage might be convenient only in the case of a single file. Still, I believe such an approach might have other drawbacks if the dataset becomes more extensive, and we would like to productionize our code. Although I generally prefer the Ray Client approach for its interactive feel, I have decided to use Ray Jobs as it is more recommended.

So, the next step is to keep all files in S3-like storage. It looks like the only feasible way to store the dataset in distributed cases.

Here are the conclusions. Reading Ray’s documentation, I need help finding an explicit recommendation on how to deal with such cases. However, most of the prototypes usually are done starting with locally stored datasets. Therefore, to begin, it would be great to generally advise to first transfer the dataset to S3, starting with Ray. Also, “Ray Client” and “Ray Job” concepts might be described in more detail by providing examples. Although most examples rely on Ray Client, the recommended one is Ray Job and Ray Client is suggested for experts only.

@Dmitry_Balabka
A naive approach to running the script as it is leads to an error of missing files. So, I tried to rsync files directly to nodes using file_mounts in ray_cluster.yaml:

file_mounts:
    ~/src/: src/
    # The following line rsyncs all files to nodes.
    ~/data/: data/
    ~/poetry.lock: poetry.lock
    ~/poetry.toml: poetry.toml
    ~/pyproject.toml: pyproject.toml
    ~/requirements.txt: requirements.txt

I haven’t succeeded with this, but it still seemed incorrect to me.

There is a limit of 2G of uploading or syncing. So that is not the best option for large datasets that exceed that limit.

Another option that you suggested is to use Object Storage. However, it might have performance issues since it requires processing numerous object references. Keeping the dataset in the Object Storage might be convenient only in the case of a single file. Still, I believe such an approach might have other drawbacks if the dataset becomes more extensive, and we would like to productionize our code. Although I generally prefer the Ray Client approach for its interactive feel, I have decided to use Ray Jobs as it is more recommended

Yes, if you have different datasets types each of different kinds, then this may affect, long running, the performance since there will be some transfer to data.

Storing all your data in S3 is the best option, and when you use the Ray data APIs to read, and partition or shard it, each worker will only read the amount of data it will fit into its memory.

Here is an example of doing a Batchhttps://github.com/ray-project/ray-educational-materials/blob/main/Ray_Core/Ray_Core_4_Remote_Classes_part_2.ipynbInferece for Dataset stored on S3, the each model gets a batch or portion of the file on S3.

An example of how to use Ray Data for distributed dataset : https://docs.ray.io/en/latest/train/dl_guide.html#distributed-data-ingest-with-ray-datasets-and-ray-train

from ray.air import session

# Datasets can be accessed in your train_func via ``get_dataset_shard``.
def train_func(config):
    train_data_shard = session.get_dataset_shard("train")
    validation_data_shard = session.get_dataset_shard("validation")
    ...

# Random split the dataset into 80% training data and 20% validation data.
dataset = ray.data.read_parquet("s3://..")
train_dataset, validation_dataset = dataset.train_test_split(
    test_size=0.2, shuffle=True,
)

trainer = TorchTrainer(
    train_func,
    datasets={"train": train_dataset, "validation": validation_dataset},
    scaling_config=ScalingConfig(num_workers=8),
)
trainer.fit()

.

1 Like