Multiple writers attempt to write to the same file

We’re having an issue using Ray with our HDFS storage backend. The issue is that Ray spawns several writers that attempt to update the same file at the same time. This results in a FileNotFoundException. Example output:

^[[36m(TorchTrainer pid=14750)^[[0m Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist:
/app/<path>/TorchTrainer_a492f_00000_0_2024-09-04_03-56-19/result.json
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2730)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2610)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:837)
^[[36m(TorchTrainer pid=14750)^[[0m     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:\
525)
...
^[[36m(RayTrainWorker pid=14923)^[[0m org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /app/<path>/TorchTrainer_a492f_00000_0_2024-09-04_03-56-19/result.json

Our HDFS team confirmed that two different IP addresses were attempting to write to these files at the time of this error and therefore HDFS rejected one of the requests. You can also see in the error output above that two different pids are throwing errors (RayTrainWorker and TorchTrainer). Ray should never try to write to the same file from two different processes because most storage backends do not support simultaneous writes on a given file. I’m wondering if I’ve configured something incorrectly ?

Some details on our job. Our job is running on a ray cluster with one head node and zero workers. The ray cluster runs on top of k8s. We are running ray AIR train job using TorchTrainer.fit(). Our configuration is as follows:

scaling_config = ScalingConfig(
  trainer_resources={"CPU": 1},
  num_workers=1,
  resources_per_worker={"CPU": 10, "GPU": 1},
  use_gpu=True,
)
sync_config = SyncConfig(sync_artifacts=True)
storage_filesystem, storage_path = fsspec.core.url_to_fs(<URL>)
storage_filesystem = pa.fs.FSSpecHandler(fs)
storage_filesystem = pa.fs.PyFileSystem(fs)
run_config = RunConfig(storage_filesystem=storage_filesystem, storage_path=storage_path, sync_config=sync_config)

trainer = TorchTrainer(
  train_loop_per_worker=lambda config: train_loop(config, metadata=metadata),
  train_loop_config=params,
  scaling_config=scaling_config,
  run_config=run_config,
  datasets=datasets,
  dataset_config=dataset_config,
)
result = trainer.fit()

It sounds like this may be a known issue with Ray 2.9. We will upgrade to ray 2.10+ and report back if the errors disappear.