Same code gets different output

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I run the code as below twice but get different outputs. Are there any bugs in there? I run the code on a single machine.

import ray
import numpy as np

@ray.remote(num_cpus=2)
def A():
    print("A")
    y_id = C.remote(B.remote())
    y = ray.get(y_id)

@ray.remote(num_cpus=2)
def B():
    print("B")
    data=np.random.normal(size=(1000,1000))
    return data

@ray.remote(num_cpus=2)
def C(data):
    print("C")
    dataa=data
    return 'x'

if __name__ == '__main__':
    ray.init(num_cpus=2)

    print('a')
    ray.get(A.remote())
    print("b")

    ray.shutdown()

The outputs are shown below:

a
(A pid=38746) A
(B pid=38745) B
b

Another output:

a
(A pid=39333) A
(B pid=39334) B
(C pid=39334) C
b

I think the second output is correct. I build the ray from the source and got the same problem.

when I change ray.get(A.remote()) to A.remote() it only outputs a and b. Does that mean the ray runs remote functions when I trigger it?

When you call func.remote(), the task is submitted and scheduled, but not necessarily runs yet (This could be simply due to latency in scheduling or the task is waiting for some dependencies to be ready). You could think of this being the equivalence of Thread.start() of python’s threading.

Why the program looks non-deterministic then?

The reason you seem to be getting non-deterministic results from your program is ray.shutdown() gets called before output from the worker is be redirected to the driver. If you remove your ray.shutdown() or simply add some more sleep before that, you should see “deterministic” output.

And if you add some tracking code like below, you could see an approximate execution sequence logged by the perf counters, which is not best represented by the ordering of stdout printing.

In [3]: import ray
   ...: import numpy as np
   ...: import time
   ...: 
   ...: @ray.remote(num_cpus=2)
   ...: def A():
   ...:     print(f"A: {time.perf_counter()}")
   ...:     y_id = C.remote(B.remote())
   ...:     y = ray.get(y_id)
   ...:     return y
   ...: 
   ...: @ray.remote(num_cpus=2)
   ...: def B():
   ...:     print(f"B: {time.perf_counter()}")
   ...:     data=np.random.normal(size=(1000,1000))
   ...:     return data
   ...: 
   ...: @ray.remote(num_cpus=2)
   ...: def C(data):
   ...:     print(f"C: {time.perf_counter()}")
   ...:     dataa=data
   ...:     return dataa
   ...: 
   ...: if __name__ == '__main__':
   ...:     ray.shutdown()
   ...:     ray.init(num_cpus=2)
   ...: 
   ...:     print(f'a:{time.perf_counter()}')
   ...:     ray.get(A.remote())
   ...:     print(f"b:{time.perf_counter()}")
   ...: 
Started a local Ray instance. View the dashboard at http://127.0.0.1:8265.
a:634776.807467346
(A pid=501632) A: 634776.817099057
(B pid=501633) B: 634776.866263363
b:634776.971131826

(C pid=501633) C: 634776.95947828

This is actually a good “gotcha” IMO, the Ray team should definitely improve the usability or provide more straightforward and intuitive experience.

Thanks for your reply!

In python’s thread, join() is used to block the main process. What function does use in ray? In python’s thread, the communication between processes uses some way like the queue and the pipe. In ray, do we use Plasma or something else?
As my program shows, A will wait for C and B completed. when will B run and output something due to B depending on no one?

Do they start to run when I need to get the function return?

H @xyzyx,

You either need this:

y_id = C.remote(ray. get(B.remote()))

Or in C you need

ray. get(data)

but they will cause an issue because of num_cpus. Ray should be smart enough to prevent the deadlock but better to just avoid it yourself.

ray.get and ray.wait allow you to synchronize different tasks/actors in a chain by synchronizing on the return values. e.g., Your C will only execute after B executes.

For communication and synchronization, you could also consider message passing with ray

B will run when resources are available, which in this toy example, “fairly quickly” after you call the remote(). Or when the scheduler schedules it to the worker. This is happening async with the driver (your python script) runs.

They are only synchronized when you call ray.get or ray.wait.

Does this make sense?

Thanks,@rickyyx ! Your responses are very useful. :smiley: