I am simulating particle trajectories for 1000 particles in parallel, using Ray. The particles are objects collected in a list (“particle_list”). I have a remote task (“remote_lag_sti”) that is run with the particle and velocity data as input and returns a dict (“sti_dict”) with the position and velocity of a particle at every time step (3600 time steps). I am using a differential equation solver for the computation, but sometimes it fails to finish in reasonable time, so for the tasks that use more than 20 seconds, I change to a different solving algorithm (change method to “RK23”) and restart the cancelled tasks. This is done in two steps:
ray.init()
tre_plasma = ray.put(tre)
jobs = {remote_lag_sti.remote(particle=pa, tre=tre_plasma):pa for pa in particle_list}
not_ready = list(jobs.keys())
cancelled = []
for elem in not_ready:
try:
sti_dict = ray.get(elem, timeout=(20))
jobs[elem].sti_dict = sti_dict
except (GetTimeoutError,AssertionError):
ray.cancel(elem, force=True)
cancelled.append(jobs[elem])
jobs[elem].method = "RK23"
if len(cancelled) > 0:
jobs2 = {remote_lag_sti.remote(particle=pa, t_span=t_span, tre=tre_plasma):pa for pa in cancelled}
not_ready = list(jobs2.keys())
while True:
ready, not_ready = ray.wait(not_ready)
sti_dict = ray.get(ready[0])
jobs2[ready[0]].sti_dict = sti_dict
if len(not_ready)==0:
break
ray.shutdown()
However, halfway during the second stage (running the task for the particle in the list “cancelled”), the computer runs out of memory, because the RAM used by each task is stedily increasing all the time. I don’t understand why. Are not the variables in remote_lag_sti deleted when the function returns? The dict “sti_dict” is, just a dictionary with 3600 elements, each holding a numpy array of 4 elements.