I don’t know why it would be slower but as I understand it, numpy will only use one CPU for this calculation. You need to leverage a distributed computing library like modin to transform the numpy array into a distributed compute object.
I wonder if it is the ray.get() method is a blocking function, which takes so much time to initialize and release memory after calculation. As grechaw said, I think you could compare the time usage of python built-in multiprocessing and ray actor framework.
Check this piece of codes:
from multiprocessing import Pool
import numpy as np
import ray
import time
def multiply(input):
return np.dot(input, input)
@ray.remote
class Actor:
def __init__(self):
pass
def run(self, input):
return np.dot(input, input)
def make_generator(obj_ids):
while obj_ids:
done, obj_ids = ray.wait(obj_ids)
yield ray.get(done[0])
if __name__ == '__main__':
ray.init()
worker = 4
testing_inputs = [np.random.random((5000, 5000)) for _ in range(16)]
start1 = time.time()
with Pool(worker) as p:
output1 = p.map(multiply, testing_inputs)
end1 = time.time()
print("multiprocessing takes {}".format(end1 - start1))
start2 = time.time()
workers = [Actor.remote() for _ in range(worker)]
obj_ids = [workers[i % worker].run.remote(input) for i, input in enumerate(testing_inputs)]
output2 = [i for i in make_generator(obj_ids)]
end2 = time.time()
print("ray takes {}".format(end2 - start2))
Save 25 percent of the time for my runs on 4 workers.
I think it’s actually because ray automatically sets OMP_NUM_THREADS in its initialization of worker functions. Could you try setting it to something higher and then import numpy?