Best solution to have multiprocess working in actor?

I have an existing application (which extensively uses python multiprocessing lib ) and trying to make it transit into actor. I’ve also read ray’s multiprocessing pool API here: Distributed multiprocessing.Pool — Ray 3.0.0.dev0

A simplified version of my application looks like this:

def func(var):
print(“func:”, var)

def test_multi_process():
ctx = mp.get_context(“spawn”)
for i in range(3):
p = ctx.Process(
target=func, args=(“GGGG”)
)
p.start()
time.sleep(1)
pserver_list.append(p)

for p in pserver_list:
  p.join()

@ray.remote
class RayServer(object):
def serve(self):
test_multi_process()
return “SSSSSS return”

if name == “main”:
ray.init(“auto”)
svr = RayServer.options(name=“RayServer”, lifetime=“detached”).remote()
print(ray.get(svr.serve.remote()))

And I am getting following errors:

File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/process.py”, line 112, in start
self._popen = self._Popen(self)
File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/context.py”, line 284, in _Popen
return Popen(process_obj)
File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/popen_spawn_posix.py”, line 32, in init
super().init(process_obj)
File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/popen_fork.py”, line 20, in init
self._launch(process_obj)
File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/popen_spawn_posix.py”, line 47, in _launch
reduction.dump(process_obj, fp)
File “/home/centos/anaconda3/envs/dev/lib/python3.7/multiprocessing/reduction.py”, line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can’t pickle <function func at 0x7fc4d2759200>: attribute lookup func on main failed

I assume that is because an actor impl requires pickle-able, but multiprocessing lib is getting in the way. What is my best option now? Suggestions? Is replacing using ray’s multi-process lib the only way to solve this? (but that will probably will require a lot of refactoring work, plus I don’t know if this lib has any side-effect).

Let me see if I understand the question first. So, you just replaced multiprocessing lib to Ray’s one, and you are seeing the pickle error? Am I correct?

Thanks!

I haven’t start replacing them yet. The error is by running original application with the native python multi-process lib. By the time I got the errors, I didn’t heard about Ray’s mp lib.

Not sure if replacing is the best option. Plus, if there is an easy way to make my application’s mp lib work as is, then that would be perfect.

Ray’s multi processing lib should be the drop-in replacement of mp library, but it doesn’t support full APIs. So there’s def possibility your application won’t be ported seamlessly. For your issue, it is highly likely you have implicit capture of un-pickleable object in your actor definition. This is an example of implicit capture.

a = object # Imagine this is a lock object

@ray.remote
class A:
    def __init(self)__:
        # In this case, a is captured from the above reference a, it should be pickelable.
        self.a = a 

To help troubleshooting this sort of serialization issue, we support a inspection tool; Serialization — Ray v2.0.0.dev0

One other thing you can try is to import multi processing library within an actor like this;

@ray.remote
class RayServer(object):
def serve(self):
    import [multi processing library]
    test_multi_process()
    return “SSSSSS return”

Thanks sangcho. Played that util tool, nothing abnomal reported from my function :frowning: but let me try to dig a big more.

One high level question (let me know if it would be better to start a dedicated thread for this). Is using multiprocess inside actor a good practice in the first place? Instead of multi-process I also see ActorPool, but I am not sure if worth the effort trying that one out.

A bit more context what my application is doing: starts a multi process pool, each with its own initialization. Then this pool is used for lots of on-demand operations, computation and IO combined. Each operation takes about ~100 ms to finish.

Is using multiprocess inside actor a good practice in the first place

This is not a good practice if you use fork under the hood. But if you just use the multi processing pool, it is probably fine (although probably using other actors is a better idea because that will offload all the resource management to Ray, which will simplify your architecture).

Ah, thanks!

" not a good practice if you use fork under the hood",

that probably explains the issue that I ran into (original posting) .

I was using “multiprocessing.get_context(“spawn”).Process(xxx)”, which creates a new process. That is probably essentially is a os.fork(). My guess. reading py doc …

1 Like

Ah, yeah then it makes sense. Ray is not working with fork because it has some in-memory states that shouldn’t be duplicated. As you can imagine ray workers need to be registered to Ray, and code around it can be broken by fork calls which duplicates the states without having actual RPC communication with Ray runtimes.

Tried everything that sangcho suggested. Still the same unpickable error.

I cleaned the code into following self-contained minimal snippet, with both the Process() and Pool() way. Can someone point out where the problem could be?

Much appreciated.

import ray

def simple_func(var):
    print(var)

@ray.remote
class RayServer(object):
    def serve(self):
        import multiprocessing as mp
        ctx = mp.get_context("spawn")
        for i in range(3):
            p = ctx.Process(target=simple_func, args=("GGGG"))
            p.start()
            pserver_list.append(p)
        return "SSSSSS return from RayServer"

@ray.remote
class RayPoolServer(object):
    def serve(self):
        import multiprocessing as mp

        ctx = mp.get_context("spawn")
        pool = ctx.Pool(4)
        pool.map(simple_func, [1, 2, 3])

        return "SSSSSS return from RayPoolServer"

if __name__ == "__main__":
    from ray.util import inspect_serializability
    inspect_serializability(RayPoolServer, name="RayPoolServer")
    inspect_serializability(simple_func, name="simple_func")

    ray.init("auto")
    svr1 = RayServer.remote()
    print(ray.get(svr1.serve.remote()))

    svr2 = RayPoolServer.remote()
    print(ray.get(svr2.serve.remote()))

Let me see if I can reproduce the issue and find the cause. If I find it, I will share how I did it.

1 Like

It looks like it is the issue from the multiprocessing library.

Pool needs to pickle (serialize) everything it sends to its worker-processes (IPC). Pickling actually only saves the name of a function and unpickling requires re-importing the function by name. For that to work, the function needs to be defined at the top-level, nested functions won't be importable by the child and already trying to pickle them raises an exception (more).

I think Ray handles this pickling issue by using our custom cloudpickle implementation (cc @suquark to confirm), but multi processing doesn’t.

So I think the issue is the simple function is defined in the top level (your entry python program), but it is not importable within Ray actor process (so it is not pickleable).

As an example, try this;

import ray

@ray.remote
class RayPoolServer(object):
    def serve(self):
        import multiprocessing as mp
        import ray

        ctx = mp.get_context("spawn")
        pool = ctx.Pool(4)
        pool.map(ray.nodes(), [])

        return "SSSSSS return from RayPoolServer"

if __name__ == "__main__":
    from ray.util import inspect_serializability
    inspect_serializability(RayPoolServer, name="RayPoolServer")
    inspect_serializability(simple_func, name="simple_func")

    ray.init()
    # svr1 = RayServer.remote()
    # print(ray.get(svr1.serve.remote()))
    
    svr2 = RayPoolServer.remote()
    print(ray.get(svr2.serve.remote()))

As you can see ray.nodes() is pickleable because you imported ray within RayPoolServer, which means ray.nodes function is now importable/accessible from the top level, which is the actor process (so that it is pickleable)

1 Like

Ahha. Interesting findings. Thank you so much! I think that is the cause, but I am not sure the best way to solve it.

The reason why I am asking, is because following code works:

def simple_func(var):
    print(var)

@ray.remote
class RayPoolServer(object):
    def serve(self):
        simple_func("KKKK")
        return "SSSSSS return from RayPoolServer"

This means the upper level funciton is visible inside server(). Somehow mp lib gets in the way and we need to further pass simple_func’s definition into the sub-scope of the process we are creating.

To report back my findings.

As sangcho suggested, I created another module and put simple_func in there. Then reference this function inside Actor, mp lib then works.

Any other places in the same main file, it won’t work.

The root cause is still unknown, probably much deeper than what it seems.

Thanks a lot folks!

This must be related to some weird pickle issues and Python module import… For now, it seems like the solution you used (move simple_func to a different module) is the only known working solution. cc @suquark please let us know if you know any other workaround!

1 Like

@HuangLED @sangcho It is simply because python multiprocessing is using the original pickle library, and it cannot access any functions defined in the entrypoint script from a remote process like Ray actors (unless the process is launched by multiprocessing itself). You can define func in another python script and then import it to the entrypoint script. This should help multiprocessing library locate it. Actually I see you already solved that by defining it in another python module.

1 Like

@suquark for Ray, it is working because we are using the cloudpickle that has some workaround to this mechanism?

yes, cloudpickle has some workaround that integrates with Ray to run nested remote function correctly. But this is the reason that Ray does not fail, not the reason that explains the failure case of multiprocessing.

The direct cause is that when simple_func is passed to ctx.Process, it is not exactly the original simple_func, because it no longer stays in the memory of the main process and its context changes. For example, originally you can from __main__ import simple_func, now you certainly cannot do that. This breaks pickle used by multiprocessing.

The root cause is that multiprocessing is mostly not designed to spawn processes by processes other than the main process. For example:

 import multiprocessing as mp

 def g(x):
     return x + 1


 def f(x):
     ctx = mp.get_context("spawn")
     pool = ctx.Pool(4)
     return pool.map(g, range(x))

 if __name__ == '__main__':
     ctx = mp.get_context("spawn")
     pool = ctx.Pool(4)
     pool.map(f, [1, 2])

Multiprocessing will simply raise AssertionError: daemonic processes are not allowed to have children.

1 Like