Leaking worker memory

Hello all,

I am writing a Monte Carlo simulation and trying to use ray to the best of my abilities, which is not much.

Basically, I wanted to have a DataHolder class which is basically a wrapper for a huge numpy array with some neat tricks. I also made a DataProcessor class which, ideally, would view of the numpy array in DataHolder, would randomly pick a sub-array, process it and call DataHolder to write it down in master array. I was thinking that this approach allows me to use a read-only view of shared arrays between process and efficiently use my cores.

What I was able to do is produce major memory leak that crashes the program. Here is a stripped down minimal working example:

import time
import ray
import numpy as np

CPU = 8

if not ray.is_initialized():
ray.init(num_cpus=CPU)

@ray.remote
class DataHolder:

big_array = None

def __init__(self):
    self.big_array = np.zeros ((10,10))

def get_big_array (self):
    return self.big_array
    
def update_big_array (self, ar):
    # somehow i know this goes to (2,2)
    
    self.big_array[2:5,2:5] = ar.copy()

@ray.remote
class DataProcessor:

def __init__(self, data_holder):
    self.data_holder = data_holder
    self.big_array = ray.get (data_holder.get_big_array.remote ())

def work (self, steps = 1000000):
    for n in range (steps):
        # Q: pick a number, any number! A: (2,2)
        
        ar = self.big_array[2:5,2:5]
        
        br = np.copy (ar)
        
        br = self.process_subarray(br)
        
        self.data_holder.update_big_array.remote (ray.put (br))


def process_subarray (self, sub_ar):
    return np.cumsum (sub_ar, axis = 0)            

dh = DataHolder.remote ()

dps = [DataProcessor.remote (dh) for _ in range (CPU)]

time.sleep (5)

results = [dps[i].work.remote () for i in range (CPU)]

ray.get (results)

What happens is that on Ubuntu 20.04, python 3.7.9. ray 1.0.1.post1 is that data_processors are gobbling up memory until whole program fails. I tried to force gc.collect () whenever psutil.virtual_memory().percent goes over some value but no luck - does that mean I am filling memory with valid objects? If I was bombing shared memory with gazzillion of numpy views or arrays, it certantly doesn’t show in ray dashboard. I also tried deleting ar and br at the end of each loop - no go.

So, I’m misusing ray but can’t figure out exactly how. Any pointers or text to read is welcomed!

For this type of scenario, the best way is to use ray memory command. This will show you details about each Ray object in the plasma store and where they are created. Memory Management — Ray v1.1.0

Can you try debugging with this command and follow up again in this thread? If you still couldn’t figure out, I’d love to do additional help here.

Ok, so I did as told and found enormous number of Pending task references such as:

2ca53902e25910316170691e0100008055030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53

Offending line (:53) is:

self.data_holder.update_big_array.remote (ray.put (br))

So, if I am interpreting this right, I am creating a bunch of small ndarrays and leaving them to clutter the shared memory through ray.put ? I have traced several similar question on stack-exchange, but cannot figure how to recycle ids or delete after use. I’ve tried adding del ndarray_id within handler and processor but to no avail. I could use another kick in the right direction. Many thanks!

It’ll be great if you can provide the full output btw!

So used by pending task means that the object created by ray.put is not GC’ed until the task that is using will finish. In this case, the object ray.put (br) won’t be GC’ed until self.data_holder.update_big_array.remote is completed. A good solution here is to batch the remote calls instead of submitting the tasks all at once.

Related design pattern.

Thanks. The site does not allow me to attach .txt. and the whole output is over 16,000 lines long - its enough I keep crashing my PC, don’t wanna ruin your shiny website :slight_smile: .

I cannot do batch writing because the physics of the MC simulation assumes “nearly” real-time update and batch writing would interfere with the very thing I’m trying to research.

I found this ray.util.queue thing which seems to be perfect for me - all processors dump to queue which should be leak-proof and then data handler would write it down as they come.

What do you think, is that viable?

Object ID Reference Type Object Size Reference Creation Site

; worker pid=27689
15c675b22d037e3b44ee453c01000080ce040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080fc040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080a0030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008003050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c010000805c060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c010000803f050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008054060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008066050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080c2030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080a0020000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008072010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c010000809b010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080b7040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008007060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080c0040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008026030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008050060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080db040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080c3040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c010000803b050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080b2050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c010000805a010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c0100008099060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080ac050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
15c675b22d037e3b44ee453c01000080c4020000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
<cutting out some 16,000 lines>
2ca53902e25910316170691e0100008015000000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008045000000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e010000801b020000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008063040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e010000803f050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080b5050000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008083070000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008055030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e010000800a010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e010000802f030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080a7010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080f2040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008054070000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008053030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e010000807f020000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080c3030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008064030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008009010000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008022000000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080ff020000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080bd060000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008017030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080b1070000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e01000080f3030000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53
2ca53902e25910316170691e0100008099040000 USED_BY_PENDING_TASK 313 (put object) ./ray-example-8.py:work:53

Yeah that sounds like a good use case of the queue. Can you try out and lmk how this works out? I’d love to hear more use cases around the queue!

Will do. Thanks for your assistance.

yep. If you have trouble, feel free to tag me there!

Well, it seems to work - leak is gone. Implementation with queue works provided that the workers actually don’t do tiny tasks. I don’t think its optimal but its workable. Thanks for your help.

import time
import ray
import numpy as np

from ray.util.queue import Queue

from hashlib import sha256

CPU = 8

if not ray.is_initialized():
ray.init(num_cpus=CPU)

@ray.remote
class DataHolder:

big_array = None

def get_queue (self):
    self.queue = Queue (maxsize = 1000, actor_options={"num_cpus":1})
    return self.queue

def __init__(self):
    self.big_array = np.zeros ((10,10))

def get_big_array (self):
    return self.big_array
    
def update_big_array (self):
    
    self.updated = 0
    
    while True:
        ar = self.queue.get ()
        self.big_array[2:5,2:5] = ar.copy()
        self.updated += 1
        
        if self.updated % 1000 == 0:
            print (self.updated, flush = True)

@ray.remote
class DataProcessor:

def __init__(self, data_holder, queue):
    self.queue = queue
    self.data_holder = data_holder
    self.big_array = ray.get (data_holder.get_big_array.remote ())

def process_subarray (self, sub_ar):
    return np.cumsum (sub_ar, axis = 0)

def work (self, steps = 10000):
    
    for n in range (steps):
        # Q: pick a number, any number! A: (2,2)
        
        ar = self.big_array[2:5,2:5]
                   
        br = self.process_subarray(ar)
        
        string = "lets simulate some work"
        
        for _ in range (20000):
            string = sha256(string.encode('utf-8')).hexdigest()
        
        self.queue.put (br)

dh = DataHolder.remote ()

q = ray.get (dh.get_queue.remote ())

dps = [DataProcessor.remote (dh, q) for _ in range (CPU)]

time.sleep (5)

dh.update_big_array.remote ()

results = [dps[i].work.remote () for i in range (CPU)]
ray.get (results)

1 Like

Nice that it works! And thanks for posting your code here :)!!