Multiple writers attempt to write to the same file

We’re having an issue using Ray with our HDFS storage backend. The issue is that Ray spawns several writers that attempt to update the same file at the same time. This results in a FileNotFoundException. Example output:

^[[36m(TorchTrainer pid=14750)^[[0m Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist:
/app/<path>/TorchTrainer_a492f_00000_0_2024-09-04_03-56-19/result.json
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2730)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2610)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:837)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:\
525)
...
^[[36m(RayTrainWorker pid=14923)^[[0m org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /app/<path>/TorchTrainer_a492f_00000_0_2024-09-04_03-56-19/result.json

Our HDFS team confirmed that two different IP addresses were attempting to write to these files at the time of this error and therefore HDFS rejected one of the requests. You can also see in the error output above that two different pids are throwing errors (RayTrainWorker and TorchTrainer). Ray should never try to write to the same file from two different processes because most storage backends do not support simultaneous writes on a given file. I’m wondering if I’ve configured something incorrectly ?

Some details on our job. Our job is running on a ray cluster with one head node and zero workers. The ray cluster runs on top of k8s. We are running ray AIR train job using TorchTrainer.fit(). Our configuration is as follows:

scaling_config = ScalingConfig(
  trainer_resources={"CPU": 1},
  num_workers=1,
  resources_per_worker={"CPU": 10, "GPU": 1},
  use_gpu=True,
)
sync_config = SyncConfig(sync_artifacts=True)
storage_filesystem, storage_path = fsspec.core.url_to_fs(<URL>)
storage_filesystem = pa.fs.FSSpecHandler(fs)
storage_filesystem = pa.fs.PyFileSystem(fs)
run_config = RunConfig(storage_filesystem=storage_filesystem, storage_path=storage_path, sync_config=sync_config)

trainer = TorchTrainer(
  train_loop_per_worker=lambda config: train_loop(config, metadata=metadata),
  train_loop_config=params,
  scaling_config=scaling_config,
  run_config=run_config,
  datasets=datasets,
  dataset_config=dataset_config,
)
result = trainer.fit()

It sounds like this may be a known issue with Ray 2.9. We will upgrade to ray 2.10+ and report back if the errors disappear.

1 Like

Did new Ray versions fix this issue @ebeckeruber ?

It did not, at least for me. I am using MultiworkerMirroredStrategy with Tensorflow:
[36m(TrainController pid=38513)e[0m get_train_context().report(e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m training_result = self._save_checkpoint(e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “/home/ray/anaconda3/lib/python3.10/site-packages/ray/train/v2/_internal/execution/context.py”, line 206, in _save_checkpointe[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m persisted_checkpoint = self.storage_context.persist_current_checkpoint(e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “/home/ray/anaconda3/lib/python3.10/site-packages/ray/train/v2/_internal/execution/storage.py”, line 476, in persist_current_checkpointe[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m _pyarrow_fs_copy_files(e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “/home/ray/anaconda3/lib/python3.10/site-packages/ray/train/v2/_internal/execution/storage.py”, line 111, in _pyarrow_fs_copy_filese[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m return pyarrow.fs.copy_files(e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “/tmp/ray/session_2025-12-17_23-18-47_907619_1/runtime_resources/pip/f5ff56ade2291dd18833f287d82d1efb1ea9b198/virtualenv/lib/python3.10/site-packages/pyarrow/fs.py”, line 256, in copy_filese[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m _copy_files_selector(source_fs, source_sel,e[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “pyarrow/_fs.pyx”, line 1648, in pyarrow._fs._copy_files_selectore[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m File “pyarrow/error.pxi”, line 92, in pyarrow.lib.check_statuse[32m [repeated 3x across cluster]e[0m

e[36m(TrainController pid=38513)e[0m OSError: [Errno 255] HDFS Write failed. Detail: [errno 255] Unknown error 255e[32m [repeated 3x across cluster]e[0m

This error indicates that Ray is attempting to save checkpoints to HDFS, but multiple workers are trying to write to the same file simultaneously, which HDFS does not support. This results in HDFS write failures and OSError exceptions. This is a known issue with Ray’s checkpointing mechanism on HDFS, especially in distributed training scenarios with TensorFlow’s MultiWorkerMirroredStrategy, and has been reported by other users as well. Upgrading Ray may not resolve this, as the underlying problem is with concurrent writes to the same file on storage backends that do not support it, like HDFS.

A possible workaround is to ensure that only one worker (typically rank 0) writes the checkpoint, or to use a storage backend that supports concurrent writes, such as S3 or NFS, as recommended in the Ray documentation. For HDFS, you may need to implement custom logic to avoid simultaneous writes or use a different storage solution for checkpointing. See the discussion and recommendations in the Ray community forum for more details: multiple writers attempt to write to the same file.

Would you like more detail on how to implement a workaround or configure Ray for a different storage backend?

Sources:

Hint: Mention @RunLLM in the post for followups.

I tried this. This basically hangs the ray cluster waiting on workers to report checkpoint.

e[36m(SynchronizationActor pid=236305)e[0m `ray.train.report` has not been called by all 3 workers in the group.

e[36m(SynchronizationActor pid=236305)e[0m The workers have been waiting for 1141.90 s for the following ranks to join the `ray.train.report` call: [0].

e[36m(SynchronizationActor pid=236305)e[0m Also ensure that workers are not hanging on other operations, causing them to miss this synchronization barrier.

e[36m(SynchronizationActor pid=236305)e[0m You can set the RAY_TRAIN_REPORT_BARRIER_WARN_INTERVAL_S environment variable to change the frequency of this warning (current value: 60 s).