Reproducing the result in Ray Paper

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

In Ray’s paper, Ray compared with OpenMPI and the results show in Figure 12(a). I try to reproduce the result but failed. Here is my script.

import sys
import numpy as np
import time


@ray.remote(num_cpus=32)
def set(data):
    h=data.shape
    x=ray.put(np.ones(1024*1024*1024))
    time.sleep(15)
    return x

if __name__ == '__main__':

    # Start Ray.
    ray.init(address='auto')
    print(ray.cluster_resources())
    num_node =int(sys.argv[1])
    print(f'get time,wait time,remote time')
    for _ in range(10):
        remote_time=time.time()
        lix=[set.remote(np.ones(1024*1024*1024)) for _ in range(num_node)]
        wait_time=time.time()
        li=lix
        while len(li) !=0:
            done,li=ray.wait(li,fetch_local=False)
        start_get=time.time()
        l=ray.get(lix)
        result=np.array(l).sum(axis=0)
        end_get=time.time()
        print(end_get-start_get,start_get-wait_time,wait_time-remote_time,len(l))

In my testbed, each node has 32 cores. I give each node 1GB of data and put 1GB of data in each node. After that, I gather all put data and sum them in one node.

Does this test contain in Ray project?

Lots of things have changed since the initial Ray paper. What’s the result you got? cc @Stephanie_Wang

Is the script I used correctly? I try to implement allreduce by ray. Is there any code I can reference?
Although many things changed, I think the implementation of allreduce follows the same pseudo-code.

There are many ways to implement allreduce on top of Ray. The version that you posted works but may not be very efficient for larger data since it gathers all results to a single worker (the driver). In addition, you need a way to broadcast the results back to the workers.

Here’s a version of allreduce that uses Ray actors that you can use as a reference. You could also take a look at the map-reduce example in the docs that uses Ray tasks.

Note however, that if you need high-performance collective communication, It is now recommended that you use a third-party library for the communication and Ray actors to wrap the third-party library. Also, as @jjyao said, I don’t think it will be possible to perfectly reproduce the results from the original Ray paper because the implementation of Ray core has changed significantly since then.

@Stephanie_Wang @jjyao Thanks both of you. I will try it.
There is another question. When Ray starts, A head node should be set up. It should be--num-cpus=0 or --num-cpus=x. The head node will start the worker process or not?

Worker processes can run on head node as well. If you set it with --num-cpus=0 then tasks with num_cpus > 0 will not run on it.