When you call func.remote(), the task is submitted and scheduled, but not necessarily runs yet (This could be simply due to latency in scheduling or the task is waiting for some dependencies to be ready). You could think of this being the equivalence of Thread.start() of python’s threading.
Why the program looks non-deterministic then?
The reason you seem to be getting non-deterministic results from your program is ray.shutdown() gets called before output from the worker is be redirected to the driver. If you remove your ray.shutdown() or simply add some more sleep before that, you should see “deterministic” output.
And if you add some tracking code like below, you could see an approximate execution sequence logged by the perf counters, which is not best represented by the ordering of stdout printing.
In [3]: import ray
...: import numpy as np
...: import time
...:
...: @ray.remote(num_cpus=2)
...: def A():
...: print(f"A: {time.perf_counter()}")
...: y_id = C.remote(B.remote())
...: y = ray.get(y_id)
...: return y
...:
...: @ray.remote(num_cpus=2)
...: def B():
...: print(f"B: {time.perf_counter()}")
...: data=np.random.normal(size=(1000,1000))
...: return data
...:
...: @ray.remote(num_cpus=2)
...: def C(data):
...: print(f"C: {time.perf_counter()}")
...: dataa=data
...: return dataa
...:
...: if __name__ == '__main__':
...: ray.shutdown()
...: ray.init(num_cpus=2)
...:
...: print(f'a:{time.perf_counter()}')
...: ray.get(A.remote())
...: print(f"b:{time.perf_counter()}")
...:
Started a local Ray instance. View the dashboard at http://127.0.0.1:8265.
a:634776.807467346
(A pid=501632) A: 634776.817099057
(B pid=501633) B: 634776.866263363
b:634776.971131826
(C pid=501633) C: 634776.95947828
This is actually a good “gotcha” IMO, the Ray team should definitely improve the usability or provide more straightforward and intuitive experience.
In python’s thread, join() is used to block the main process. What function does use in ray? In python’s thread, the communication between processes uses some way like the queue and the pipe. In ray, do we use Plasma or something else?
As my program shows, A will wait for C and B completed. when will B run and output something due to B depending on no one?
Do they start to run when I need to get the function return?
ray.get and ray.wait allow you to synchronize different tasks/actors in a chain by synchronizing on the return values. e.g., Your C will only execute after B executes.
B will run when resources are available, which in this toy example, “fairly quickly” after you call the remote(). Or when the scheduler schedules it to the worker. This is happening async with the driver (your python script) runs.
They are only synchronized when you call ray.get or ray.wait.