Unable to access files from disk filesystem inside methods run using ray multiprocessing

1. Severity of the issue:
Medium: Significantly affects my productivity but can find a workaround.

2. Environment:

  • Ray version: ray[default,tune,client,data]==2.37
  • Python version: python 3.11
  • OS: linux
  • Cloud/Infrastructure: Azure Databricks
  • Other libs/tools (if relevant): Pyspark

I am running a video processing pipeline which uses ffmpeg for the processing using ray in databricks. I have create a ray cluster on top of the spark clusters to do so. Below is a sample code to give you an idea of how I am running the pipeline:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
import ray
import sys

config = configure_ray_cluster(
    driver_cores=driver_cores,
    driver_memory_gb=driver_memory_gb,
    worker_cores=worker_cores,
    worker_memory_gb=worker_memory_gb,
    num_workers=num_workers,
    num_workers_actual=num_workers_actual,    
)

# Initialize Ray cluster
try:
    shutdown_ray_cluster()
    ray_conf = setup_ray_cluster(**config)
except Exception as e:
    if (
        str(e) != "No active ray cluster to shut down."
        and e.__class__.__name__ == "RuntimeError"
    ):
        raise (e)
    print("Ray Cluster inactive. Starting new cluster ")
    ray_conf = setup_ray_cluster(**config)


current_path = sys.path

ray.init(
    ray_conf[1],
    runtime_env={
        "env_vars": {
            "PYTHONPATH": ":".join(set(current_path)),            
        },
    },
    ignore_reinit_error=True,
    configure_logging=False,
)

max_concurrency = (
    config["num_cpus_per_node"] * config["max_worker_nodes"]
    + config["num_cpus_head_node"]
)

@ray.remote
def ray_data_task(ds, logger):
    ds_shape = ds.shape[0]
    ds = ray.data.from_pandas(
        pd.DataFrame(
            ds["video_payload"].to_list(), columns=["combos"]
        )
    )

    preds = ds.repartition(ds_shape).map(
        VideoProcessor, # Ray actor class
        fn_constructor_args=[logger],
        num_cpus=1,
        concurrency=(2, max_concurrency),
    )

    # Handle empty dataset
    if preds.count() == 0:
        logger.debug("Warning: Dataset is empty after map transformation.")
        return pd.DataFrame()
    final_df = preds.to_pandas()
    logger.debug(f"{final_df=}")
    return final_df

query = f"""
    select * from table
"""

df = spark.sql(query)
batch_pandas_df = df.toPandas()
batch_pandas_df = ray.get(ray_data_task.remote(batch_pandas_df, logger))
processed_df = spark.createDataFrame(batch_pandas_df)
# Remaining pyspark processing

Inside the VideoProcessor class, it copies the video file from a unity catalog volume to the cluster filesystem under the /tmp/ directory after which it is generating 2 different types of thumbnails and currently we are doing it serially. I wanted to do it in parallel so I tried the pool method from ray.util.multiprocessing to run the 2 processes in parallel. However, the processes are unable to read the file in that location and are throwing a file not found error. I tried saving the files to /local_disk0/tmp/ directory as well but still got the same error. Due to this, I had to save it in a another unity catalog volume but i think the I/O is slowing the process down.

How do save the file on the cluster disk so that the processes running via the ray multiprocessing inside the ray actors are able to access it?

P.S: We are processing batches of ~1000 video files at a time so please consider that too!

Ray Clusters Ray Data Ray Core Ray Core > Ray Workflows