Hi, I’m creating a local ray instance using ray.init(). After this, a new process spawning via multiprocessing and I’m trying to call ray.remote from that new process. Here ray is not at all spawning and no exceptions just the code is blocking there.
I doubted here since we initiated ray from different process, so printed ray instance existence using ray.is_initialized() and this prints true in new process.
Should I connect with ray before calling ray.remote()?
Also getting exception when trying to create new ray local instance in new process. Kindly help in this.
@BalajiSelvaraj10
Thanks for the question, there might be issue with multiprocessing + ray.
But do you have a simple repro script so that I can better help with?
import numpy as np
import time
from multiprocessing import Process
import multiprocessing
np.random.seed(42)
starttime = time.time()
#@ray.remote
def matmulfunc():
print("Inn")
A = np.random.randint(0, 10, size=(140,800,500))
B = np.random.randint(0, 10, size=(140,500,800))
C = np.matmul(A,B)
return C
def rayrun():
remote_foo = ray.remote(matmulfunc)
print("Before calling ray -- ", flush=True)
obj_ref = [remote_foo.remote() for _ in range(3)]
for obj in obj_ref:
ray.get(obj)
print("Ray multitasking completed")
endtime = time.time()
print(time.strftime("%H:%M:%S", time.gmtime(endtime-starttime)))
multiprocessing.set_start_method('fork')
import ray
ray.init()
process = Process(name="test", target=rayrun)
process.start()
ray, version 2.2.0
python : 3.9.14
Using this you can reproduce the mentioned issue @rickyyx @ericl
Use case: We have around 10 serving code which will start to serve when a new msg produced into queue. We are using ray parallelism logic inside these serving code and each serving code running on different process using python multiprocessing. So I init ray before these serving code processes are spawning and then trying to use the started ray inside spawned processes.
Hey @BalajiSelvaraj10 thanks for the repro script - this is really helpful!
So I think the code is somehow stuck in grpc channel
(dev-2) ➜ logs sudo env "PATH=$PATH" py-spy dump --pid 201518
Process 201518: python /data/home/rickyx/repro-multiprocess.py
Python v3.9.16 (/data/home/rickyx/anaconda3/envs/dev-2/bin/python3.9)
Thread 201518 (idle): "MainThread"
_blocking (grpc/_channel.py:933)
__call__ (grpc/_channel.py:944)
internal_kv_exists (ray/_private/gcs_utils.py:372)
wrapper (ray/_private/gcs_utils.py:198)
export (ray/_private/function_manager.py:213)
_remote (ray/remote_function.py:282)
_invocation_remote_span (ray/util/tracing/tracing_helper.py:307)
_remote_proxy (ray/remote_function.py:129)
<listcomp> (repro-multiprocess.py:23)
rayrun (repro-multiprocess.py:23)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_launch (multiprocessing/popen_fork.py:71)
__init__ (multiprocessing/popen_fork.py:19)
_Popen (multiprocessing/context.py:277)
_Popen (multiprocessing/context.py:224)
start (multiprocessing/process.py:121)
<module> (repro-multiprocess.py:36)
And I believe this is a grpc issue with client and server fork.
So with your above example , if you do the below, it works.
- Use
spwan
rather than fork
- Move the
ray.init
to where you start ray, or do ray start --head
outside the script (this is probably a ray issue, since if you do ray.init()
before multiprocess starts a new process, it seems to init ray twice.)
import numpy as np
import time
from multiprocessing import Process
import multiprocessing
np.random.seed(42)
import ray
starttime = time.time()
@ray.remote
def matmulfunc():
print("Inn")
A = np.random.randint(0, 10, size=(140,800,500))
B = np.random.randint(0, 10, size=(140,500,800))
C = np.matmul(A,B)
return C
def rayrun():
print("Before calling ray -- ", flush=True)
ray.init()
obj_ref = [matmulfunc.remote() for _ in range(3)]
for obj in obj_ref:
ray.get(obj)
print("Ray multitasking completed")
endtime = time.time()
print(time.strftime("%H:%M:%S", time.gmtime(endtime-starttime)))
if __name__ == "__main__":
multiprocessing.set_start_method('spawn')
process = Process(name="test", target=rayrun)
process.start()
Also, not sure what’s time measurement goal here, but the current start-end time would include everything from forking, starting a ray cluster etc.
Also, for serving, have you considered using Ray serve directly? Ray Serve: Scalable and Programmable Serving — Ray 2.3.0
Or use ray’s Actor as a process to run your serving code?: Actors — Ray 2.3.0