Running ray on supercomputer with slurm

I’m trying to run Ray on Cori at NERSC (supercomputer; not a cluster) using. Others have done that more-or-less successfully, but I’m running too often into stability issues to consider this functional and I’m hoping for ideas/feedback to improve the situation.

I started with the NERSC provided script, then added features from the ATLAS ones, in particular the explicit synchronization instead of using statements like “sleep 30” and hoping that that is enough to start Ray on all workers. I removed running “ray stop” from the slurm scripts (which gave no end of trouble) and instead am calling “ray.shutdown()” from Python. On the head and worker nodes, I tried with different amounts of load/allocation (it’s clear that some resources need to be left to redis).

It works, just not all the time, and I suspect that errors will be more frequent with larger jobs (simply more chances for failure).

There are 4 types of error that occur with some regularity, in order of severity:

  • “bind: Address already in use”. As a result, the job hangs until time is up at which point it gets killed and the logs are subsequently not flushed, so no more information.
  • “_start_new_thread(self._bootstrap, ()); RuntimeError: can’t start new thread”. Happens when handing a new remote task to a multiprocessing.Pool. Exception makes it out to user code so can exit application and clean up. Bad, but at least doesn’t lose the full allocated time. (Re-submitting the task may even work?)
  • “Resource temporarily unavailable”: happens at the start of processing. Ray fully recovers from this one, but the log file is flooded with messages. Regardless this error, the application run successfully to completion. So annoying, but not the end of the world. My best guess is that this is due to the socket’s buffer being full and boost’s asio simply tries again (and then succeeds).
  • Log files not fully received. Logging (from Python’s logging module) does not always appear in the temporary directory. This does not affect running at all per se, but makes debugging a lot harder. It tried flushing stderr/stdout in several places, but nothing gets the log files consistently in full.

Any ideas of the cause on these and/or on how to improve the stability are appreciated!

Hey @wlav, these issues seem bad (and probably are new regressions).

  • ray shutdown actually only shuts down the python client and doesn’t shut down the workers from prior runs. this would cause leakages in your cluster i think? What were the issues with ray stop (though I understand you had moved away from that?)
  • “bind: address already in use”: might be related to the above where workers were not stopping properly
  • “_start_new_thread(self._bootstrap, ()); RuntimeError: can’t start new thread”: where are you calling multiprocessing.pool?
  • Log files not fully received. Logging (from Python’s logging module) does not always appear in the temporary directory: Is it possible that your tmp is a NFS, and so writes are much more expensive than normal?

Definitely want to help make you successful here.

My two cents, all ray processes should be killed if they are launched in cgroups. I know LSF does kill all processes launched in cgroups in which case “address already in use” error should not occur.

1 Like

Thanks, @rliaw, what @asm582 says is true in that there will not be anything left on the node by the batch system. The only possible leftovers would be in the temp dir handed to Ray, even as it shouldn’t b/c of unique session IDs, but experimenting with it shows it can’t be the cause (I’ve managed to get the error from a clean slate; it’s otherwise hard to prove a negative, but after experimenting for a day, I’m certain).

I think the logic of ray stop was wrong in the original script: it first gets farmed to the worked nodes, then run on the head node. I.e. ray stop runs on each individual node. However, as I read in the docs, ray.shutdown() gets called on exit from the Python script, meaning that at the very least it has already run on the head node. Does it actually still need to run on the workers? My guess is no, because it will give me the following if I do:

2021-07-14 15:36:33,389	WARNING -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: 221affc45bc9c016ffffffffffffffffffffffff03000000 Worker ID: 77b508e29b069d19f0623be195ccce461896dddb8d338251508ce309 Node ID: 0f8ad7679832b86c5d09d87970be1f3716fe1817b4ddd96e1fce8812 Worker IP address: Worker port: 10124 Worker PID: 30827
end time main job: Wed Jul 14 15:36:33 PDT 2021

The multiprocessing.Pool is created on the head node, after which tasks are mapped onto it by Ray, which farms out to the worker nodes. This being just a test, I only have 3 tasks, and 64 cores available on each worker node and on the head node. The pool gets assigned 184 cores out of those 192, as it looks like there needs to be cores left available for the use by redis.

The file system used for the temp dir is Lustre, which does not like small writes and/or many opens and closes. But that would result in a performance issue, not a data loss issue (meaning, if performance is the culprit, I’d only expect partial log files if the job gets killed prematurely because of writes taking longer than the alotted time; that is not the case).