- High: It blocks me to complete my task.
A bit more detail: we ran into an issue using RayTrain (older version 1.12.1, symptom is resource not released after cluster been used for a while), the only work-around is to re-start the cluster every time this happens. I am trying to write a test to simulate and re-produce the issue.
Came across this test utility class: ray/cluster_utils.py at 1.11.1 · ray-project/ray · GitHub ; Though in my use case, I need to fork sub-processes, and let sub-processes to connect to cluster. Is multi-sub-process connecting gonna work with this utility class? read through a few existing tests but couldn’t tell for sure.
The following code is what I tried (which does not work yet, just to show the use case I am trying to re-produce):
def trainer_worker(address):
trainer = Trainer(backend="torch", num_workers=2, use_gpu=False, max_retries=False)
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 10}
trainer.start()
results = trainer.run(test_train_func, config)
trainer.shutdown()
print("Trainer finished normally.")
@pytest.mark.basic
def test_train_resource():
from ray.cluster_utils import Cluster
c = Cluster(shutdown_at_exit=True)
head_node = c.add_node()
c.connect()
aval_num_cpu = ray.available_resources()['CPU']
aval_mem_size = ray.available_resources()['memory']/(1024**4)
print(aval_num_cpu, aval_mem_size)
pid = os.fork()
if pid == 0:
ray.init(address="auto")
worker = Process(target=trainer_worker, args=("",))
worker.start()
time.sleep(3)
# TODO: os.abort() to mock user behavior.
worker.join()
print("Worker controller finished normally.")
else:
time.sleep(1)
aval_num_cpu = ray.available_resources()['CPU']
aval_mem_size = ray.available_resources()['memory']/(1024**4)
print(aval_num_cpu, aval_mem_size)
print("Pytest process finished normally.")