Proper workflow to read local parquet file and use it on remote worker?

Hello, Ray community

I’m new to datasets, so maybe I just didn’t have some basic idea.

I’m trying to load parquet file from the local disk and use this data on remote worker in task, as it is described in docs.
Ray version is 2.2.0.

Here is the code I’m using:

import ray
ray.init("ray://REMOTE_IP:10001") 
ray.data.read_parquet("local://file.parquet")

But suddenly I get this error:

The local scheme paths local://file.parquet are not supported in Ray Client.

What am I doing wrong? Thanks in advance for any advice!

My goal is to load LARGE (larger then memory) parquet file from local disk and use it on remote node.

Hi @skabbit , thanks for your interest and moving the discussion onto Discourse!
Have you tried removing the local:// portion from the path? I was able to read a dummy parquet file that I have on my local disk after connecting to a Ray cluster that I started locally:

import ray
ray.init("ray://127.0.0.1:10001")
ds = ray.data.read_parquet("test.parquet")
ds
>>> Dataset(num_blocks=1, num_rows=5, schema={col1: int64, col2: string})

Let me know if the above doesn’t match your intended use case. Thanks!

Well, local:// works well with the local Ray cluster, but it doesn’t work with the remote Ray cluster as I mentioned above.

Here is the description of my case:

  • I need to run task on remote cluster (this is a requirement);
  • I need to use large (larger then available memory) parquet file as training data;
  • This file is located on my local machine, but it must be used on remote node.

If I use file path without local:// ray worker just throw a error:
FileNotFoundError: ./file.parquet
And this is expected, because this file doesn’t exists on the cluster, and this is mentioned in docs as well:
If the file exists only on the local node and you run this read operation in distributed cluster, this will fail as it cannot access the file from remote node.

Could you try running your script/code on the head node?

Yes, it doesn’t matter - head or worker.
Any remote instance causes this error.

(btw, not related this issue, here is the doc for running ray jobs on remote clusters: Ray Jobs Overview — Ray 2.2.0)

That’s not expected: when running on head node, there is no Ray Client involved and there should be not such error related to Ray Client.

How did you run it on Head node? Note using Ray Jobs is a recommended way and it avoids Ray Client.

That’s the thing - as you may notice I don’t use any job submitting and getting this error just by running 3 lines of code:

import ray
ray.init("ray://REMOTE_IP:10001") 
ray.data.read_parquet("local://file.parquet")

This is exactly why it failed: running these 3 lines of code will be using Ray Client (not Head node), and local:// scheme is not supported for Ray Client.

And the suggestion here is using Ray Jobs to submit this script to the cluster that you have. Note if you do this, in your script you may modify the second line to just ray.init() (using the “ray://” will use Ray Client) .
You may check the how to submit this script to your cluster with example here: Quickstart Using the Ray Jobs CLI — Ray 2.2.0