Use multi-process with Cluster utility class

  • High: It blocks me to complete my task.

A bit more detail: we ran into an issue using RayTrain (older version 1.12.1, symptom is resource not released after cluster been used for a while), the only work-around is to re-start the cluster every time this happens. I am trying to write a test to simulate and re-produce the issue.

Came across this test utility class: ray/cluster_utils.py at 1.11.1 · ray-project/ray · GitHub ; Though in my use case, I need to fork sub-processes, and let sub-processes to connect to cluster. Is multi-sub-process connecting gonna work with this utility class? read through a few existing tests but couldn’t tell for sure.

The following code is what I tried (which does not work yet, just to show the use case I am trying to re-produce):

def trainer_worker(address):
    trainer = Trainer(backend="torch", num_workers=2, use_gpu=False, max_retries=False)
    config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 10}
    trainer.start()
    results = trainer.run(test_train_func, config)
    trainer.shutdown()
    print("Trainer finished normally.")

@pytest.mark.basic
def test_train_resource():
  from ray.cluster_utils import Cluster
  c = Cluster(shutdown_at_exit=True)
  head_node = c.add_node()
  c.connect()
  aval_num_cpu = ray.available_resources()['CPU']
  aval_mem_size = ray.available_resources()['memory']/(1024**4)

  print(aval_num_cpu, aval_mem_size)
  
  pid = os.fork()
  if pid == 0:
    ray.init(address="auto")
    worker = Process(target=trainer_worker, args=("",))
    worker.start()
    time.sleep(3)
    # TODO: os.abort() to mock user behavior. 
    worker.join()
    print("Worker controller finished normally.")
  else:
    time.sleep(1)
    aval_num_cpu = ray.available_resources()['CPU']
    aval_mem_size = ray.available_resources()['memory']/(1024**4)

    print(aval_num_cpu, aval_mem_size)
    print("Pytest process finished normally.")