How to safely release shared memory from actors?

Hello,

let say I have some actors running on my system (ray2.0.0.dev, python 3.8.5, Mint 19.3). Actors operate on a shared memory chunk. When they receive a dict of named shared memories, they use it like this:

from multiprocessing import shared_memory

def refresh_main_arrays (self, main_arrays):

    if main_arrays is None:
        return
    else: 
        self.main_arrays = main_arrays 
    
    for var in main_arrays.keys ():
        
        shp, dty = main_arrays[var]
        
        shm_tmp = shared_memory.SharedMemory(name=var)
        
        arr_tmp = np.ndarray(shape=shp, dtype = dty, \
        buffer = shm_tmp.buf)
                    
        try:
            setattr(self,var+'_shm', shm_tmp)
            setattr(self,var, arr_tmp)

        except:
            print ("acquisition of array memory failed")

This runs without error for a proper dict main_arrays . Before I terminate actors, I try to release shared memory resources:

def close_main_arrays (self):
    if self.main_arrays is None:
        return
    
    for var in self.main_arrays.keys ():
        shm_tmp = self.__getattribute__(var+"_shm")
        try:
            shm_tmp.close ()
        except:
            print ("failure during unlinking and closure of memory handle " + var + "_shm") 

which also runs without error. But when I terminate an actor:

 ray.get ([H.close_main_arrays.remote () for H in Hs])
 ray.get (mnp.close_and_unlock_main_arrays.remote ())

 ray.kill (mnp)

 for each in Hs:
    ray.kill (each)

It is followed with an immediate flush of warnings:

(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 5 leaked shared_memory objects to clean up at shutdown
(pid=8316)   warnings.warn('resource_tracker: There appear to be %d '
(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/mnp_yx_size': [Errno 2] No such file or directory: '/mnp_yx_size'
(pid=8316)   warnings.warn('resource_tracker: %r: %s' % (name, e))
(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/mnp_size': [Errno 2] No such file or directory: '/mnp_size'
(pid=8316)   warnings.warn('resource_tracker: %r: %s' % (name, e))
(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/mnp': [Errno 2] No such file or directory: '/mnp'
(pid=8316)   warnings.warn('resource_tracker: %r: %s' % (name, e))
(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/mnp_yz_dist': [Errno 2] No such file or directory: '/mnp_yz_dist'
(pid=8316)   warnings.warn('resource_tracker: %r: %s' % (name, e))
(pid=8316) /home/coded/anaconda3/envs/ray2py38/lib/python3.8/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/mnp_state': [Errno 2] No such file or directory: '/mnp_state'
(pid=8316)   warnings.warn('resource_tracker: %r: %s' % (name, e))
Output from spyder call 'get_cwd':

And this repeats for every PID. It seems to me that resource_tracker fails to notice that I’ve released the resources and try to do it again just to fail. Also, if I try to access shared memory after close_and_unlink_main_arrays (), actor dies with:

RayActorError: The actor died unexpectedly before finishing this task. Check python-core-worker-*.log files for more information.

So, it seems that close_main_arrays does its job. What confuses me is that the IDENTICAL warnings are raised even if I DON’T call close_main_arrays before killing actors and I cannot tell if this is ray or multiprocessing issue. Is this the right place to discuss this and is there a guidance how can I make ray and multiprocessing play nice with each other?

Hi @sgalic, I’m not completely sure the issue in your code because I’m not deeply familiar with how MP handles shared memory. However, Ray offers a very convenient API that allows you to use shared memory for the same purpose that may make this easier.

You can simply use ray.put to put objects into the shared memory object store or return objects from remote tasks. In both cases, a ray ObjectRef is returned that you can then use ray.get to retrieve the underlying object for:

# Using ray.put.
object_ref = ray.put(np.ndarray(...))
my_array = ray.get(object_ref)

# Returning from a task.
@ray.remote
def f():
    return np.ndarray(...)

object_ref = f.remote()
my_array = ray.get(object_ref)

This object_ref can be passed between ray tasks and actors and it uses zero-copy serialization to read numpy arrays so it should be very efficient:

@ray.remote
def downstream_task(my_array):
    return process_nd_array_my_array)

ray.get(downstream_task.remote(object_ref)) # Returns output of process_nd_array()

Thanks for your answer. I am aware of the way of how ray allows fast sharing of ndarrays and I use this abundantly except in this very particular case where shared numpy needs to have one writer and many “live” readers. It is my understanding that a object that has been put in the store are read-only and final, so this cannot serve this purpose.

I see, that makes sense! Sorry I couldn’t be of more help with the mp-specific issue