Running out of memory while processing a lot of remote tasks

I am simulating particle trajectories for 1000 particles in parallel, using Ray. The particles are objects collected in a list (“particle_list”). I have a remote task (“remote_lag_sti”) that is run with the particle and velocity data as input and returns a dict (“sti_dict”) with the position and velocity of a particle at every time step (3600 time steps). I am using a differential equation solver for the computation, but sometimes it fails to finish in reasonable time, so for the tasks that use more than 20 seconds, I change to a different solving algorithm (change method to “RK23”) and restart the cancelled tasks. This is done in two steps:

	tre_plasma = ray.put(tre)
	jobs = {remote_lag_sti.remote(particle=pa, tre=tre_plasma):pa for pa in particle_list}

	not_ready = list(jobs.keys())
	cancelled = []
	for elem in not_ready:
			sti_dict = ray.get(elem, timeout=(20))
			jobs[elem].sti_dict = sti_dict
		except (GetTimeoutError,AssertionError):
			ray.cancel(elem, force=True)
			jobs[elem].method = "RK23"
	if len(cancelled) > 0:
		jobs2 = {remote_lag_sti.remote(particle=pa, t_span=t_span, tre=tre_plasma):pa for pa in cancelled}
		not_ready = list(jobs2.keys())
		while True:
			ready, not_ready = ray.wait(not_ready)
			sti_dict =  ray.get(ready[0])
			jobs2[ready[0]].sti_dict = sti_dict
			if len(not_ready)==0:

However, halfway during the second stage (running the task for the particle in the list “cancelled”), the computer runs out of memory, because the RAM used by each task is stedily increasing all the time. I don’t understand why. Are not the variables in remote_lag_sti deleted when the function returns? The dict “sti_dict” is, just a dictionary with 3600 elements, each holding a numpy array of 4 elements.

Hi @olahh!

Are not the variables in remote_lag_sti deleted when the function returns?

The worker heap memory that remote_lag_sti uses (i.e. intermediate data generated within the task) should definitely be released when the task finishes or is cancelled; the object store memory of the task arguments and the task return value may continue to live on if references to either still live outside the task, with the latter only being a factor for completed (non-cancelled) tasks.

However, halfway during the second stage (running the task for the particle in the list “cancelled”), the computer runs out of memory, because the RAM used by each task is stedily increasing all the time. I don’t understand why… The dict “sti_dict” is, just a dictionary with 3600 elements, each holding a numpy array of 4 elements.

How large is the particle and velocity data, and how much RAM do you have allocated to your object store (assuming this is a single-node, local cluster)? Some quick calculations on the return objects assuming float64 ndarray elements and int64 dict keys: (sizeof(key) + sizeof(value)) * 3600 * 1000 = (8 bytes + 4 * 8 bytes) * 3600 * 1000 = 144 MB. It’s possible that there’s a bug in Ray or a bug in your code that’s causing some data to be leaked, but I first want to confirm that a correct implementation would still fit in your computer’s RAM!

It should also be noted that you’re launching all jobs concurrently, but then synchronously trying to fetch each serially, with a 20 second timeout for each. Therefore, these timeouts are cumulative, e.g. if the first 4 tasks timeout, that’s 4 serial 20 second timeouts, so the wall time timeout for the 5th task will actually be 100 seconds. You may want to to do a ray.wait() here that applies a single 20 second timeout for all tasks instead, something like:

tre_plasma = ray.put(tre)
jobs = {
    remote_lag_sti.remote(particle=pa, tre=tre_plasma): pa
    for pa in particle_list}
# After 20 seconds, done will contain completed jobs, not_done will contain
# incomplete jobs that we wish to cancel and resubmit. fetch_local=False
# ensures that (1) we consider a job done as soon as it's finished, not when
# the result is done being transferred to the local node, (2) we don't start
# pulling every job result at once, in case that fills up the local node's
# object store.
done, not_done = ray.wait(
    list(jobs.keys()), num_returns=len(jobs), timeout=20,
cancelled = []
for to_cancel in not_done:
    ray.cancel(to_cancel, force=True)
    jobs[to_cancel].method = "RK23"
    # Remove the cancelled job from the job dict.
    del jobs[to_cancel]
if len(cancelled) > 0:
        remote_lag_sti.remote(particle=pa, t_span=t_span, tre=tre_plasma): pa
        for pa in cancelled})
# Fetch all successfully completed runs on the initial solver and the resubmitted tasks
# using the RK23 solver. If fetching all at once fills up the local node's object store 
# (i.e. when running in a multi-node cluster), we could fetch these and process them
# in batches instead.
for job_result, job in zip(ray.get(list(jobs.keys())), jobs.values()):
    job.sti_dict = job_result

Thank you, @Clark_Zinzow, for the very helpful answer! I will try to give as much and as useful information as I can, without wasting your time.

I don’t know exactly how much ram the velocity data uses, but I store it on disk as a ~5GB pickle file that is loaded and then put into a shared-memory object store with ray.put(tre).

I use two PCs independently, because I didn’t bother trying to learn the Actor/cluster bit yet. An old Intel Xeon with 24 threads and 128 GB RAM and a newer Intel core i7 with 8 threads and 64 GB RAM, so I thought it should be more than enough.

I know that the timeout builds up, and I decided that it is OK. But of course it would be better to have total control over the time I allow the task to try simulating with the first algorithm (which is BDF, the heart of the simulation is running scipy.integrate.solve_ivp ).

Your suggestions are interesting, and I pondered doing something like that myself. As I see it, the real problem that I struggle with is that I don’t have any way of checking if a task has been started and has been running for the maximum allowed time, or if it is just waiting in the queue. Actually, a month ago, I tried to implement the ideas suggested here, but I didn’t get the actor/class setup to work properly, so I gave it up after some hours. Maybe the best would be to really try to get this to work in my program.

With your suggested code, if most tasks use for example 10 seconds to finish on the machine with 8 threads, 16 tasks will be finished within the timeout, while the remaining 984 tasks will never be run, and will be cancelled and run with the second algorithm. I asked a closely related question on StackOverflow a month ago, but no one has answered yet. I also see that the

Also on the first round of my simulation, the memory usage builds up, but as tasks are canceled, the memory is also freed, so it doesn’t become a problem, as you can see on the screenshot from the dashboard:

On the second round, however, the tasks are using much more memory, but what is going on, why doesn’t the RAM used by each thread sum up to the total usage?

Of course, the problem could well be a bug in the remote_lag_sti function, but I don’t know how to find out. As I wrote, the function returns a single dict, but the content is not exactly like I wrote first. Each entry in the dict is a timestep, and the timestep contains: {‘position’: ndarray[4], ‘loops’:int, ‘caught’:bool}. Also, the dict has two entries called ‘init_time’:float and 'final_time’float. Anyway, I don’t see that this would change anything. Is there actually any way that some references to the task arguments could still be referenced? How could the returned sti_dictcontinue to live on in the object store memory?

There are 32 different cases to simulate, and the Python process was killed because of OOM in only one case so far (number 12). Still, I would like to have a more sound solution with more control than I have now, so I can be sure the simulations will complete.

Again, thank you for your time and your helpful comments and questions. I hope this reply is not too long.