Ray Distributed Load from HDFS

Hi, I am using Ray datasets to read parquet files from HDFS.

Everything works well if I read the data from HDFS into a single node ray. However, if I try to read these same parquet from a cluster with multiple workers, the workers start dying and segfaulting and they complain that CLASSPATH (needed by pyarrow to read HDFS) is not set. However, the variable is set in every machine.

This is how I am doing things:

  • Export ARROW_HDFS_LIBRARY and CLASSPATH from shell
  • run ray start --head on the main node.
  • run ray.init(address=‘auto’) in python from the main node. (somehow if i try to run ray.init() directly, it goes on timeout and crash the jupyter Kernel. But starting from the shell and connecting works fine)
  • spin up my other pods, with the same docker image
  • export the variables on the other pods
  • run ray start and pass the address of the head → the cluster is set up and i can see these new workers from the dashboard.
  • run ray.data.read_parquet from the jupyter notebook → Get a crash

Does anyone have some working code to perform distributed read from HDFS? Any reason why you think i am getting these environment variable issues?

Hi @Andrea_Pisoni, thank you for posting! I remember a Datasets user having a similar issue a while back, let me ping them and get back to you.

@Andrea_Pisoni You said that the variable is set in every machine, but have you confirmed that the CLASSPATH is properly set within the Python worker processes? You should be able to check by checking for the environment variables in a Ray task, e.g.:

import ray

@ray.remote(scheduling_strategy="SPREAD")
def check_env():
    import os
    return (
        ray.runtime_context.get_current_context().node_id,
        "CLASSPATH" in os.environ
    )

num_nodes = ...
results = [check_env.remote() for _ in range(num_nodes)]
print(ray.get(results))

Thanks for writing back Clark! I’ll run this first thing in the morning when I get access to the cluster again.

I was not aware of the spread strategy to ensure tasks are sent to all workers, that’s super useful! Would you also set environment variables this way, to ensure the variables are set correctly on the python process?

There’s no guarantee that those particular Python worker processes will get reused, so it’s heavily encouraged to specify such an environment variable in your cluster config when creating the cluster (or in a runtime environment if needing to specify dynamically, although for your use case it seems like specifying it statically in the cluster config will work).

FYI, this is a soft scheduling hint, where tasks will be encouraged to load balance across nodes, but this isn’t guaranteed. For the sake of confirming that the environment variable is set on your worker nodes in your cluster, this should hopefully suffice.

If this ends up not sufficing for some reason, you can manually run a task on a given node using the node’s node ID (retrievable for all nodes via ray.nodes()) and using the "node:{NODE_ID}" custom resource, which would allow us to confirm that this is environment variable is set on all nodes.

Thanks a lot Clark, your help has been invaluable to fix the issue. Surprisingly, the problem was with the head node, not with the worker nodes.

Looking back, it seems clear I was not doing things right as I was starting ray through the shell, attaching it to my notebook with address=‘auto’, but at the same time I was setting the environment variables in python using os.environ (I was convinced i was doing that from the shell like for the other workers - and i even wrote that in my post! Whoops!). I guess the ray process did not have access to those. Once i exported them in the shell before running ray start, everything worked.

As a side note, I took this long to figure out that the problem was with the head because oddly enough, if i do not add workers to the clusters and use the head only, ray datasets works properly. It only fails once i add more workers (even tho there is no problem with these new workers). So I guess maybe there is some slight inconsistent behaviour between ray on single node and ray distributed.

Anyhow, wanted to circle back because the help here fixed my problem, so wanted to leave this in case someone else faces the same in the future.

Thanks again!!