Hello,
I need to execute a function exactly once on each worker process in a ray cluster. My situation is the following: Some computation tasks are created and while they are running I need to create some additional tasks that simply perform some housekeeping on the worker processes. The housekeeping tasks should execute exactly once on each of the worker processes. Now I don’t know how to properly schedule these tasks such that they are executed on every worker (and not e.g. twice on the same worker). I tried to use scheduling strategy “SPREAD” but the documentation says it will spread tasks among nodes (and not workers):
import ray
import psutil
import os
from random import random
from time import sleep
# This function does some computations
@ray.remote
def f1():
sleep(random()*5)
# This function does some housekeeping and needs
# to be executed on each worker once
@ray.remote(scheduling_strategy='SPREAD')
def f2():
return os.getpid()
nCPUs = psutil.cpu_count(logical=False)
ray.init(num_cpus=nCPUs)
# Computation tasks
[f1.remote() for _ in range(20)]
# Housekeeping tasks
futures = [f2.remote() for _ in range(nCPUs)]
# Should print nCPUs unique PIDs
print(ray.get(futures))
The output of this code is for example:
[72532, 8980, 72532, 8980]
So f2 ran four times (the number of CPUs) but unfortunatly twice on the same worker processes.
Can anybody help me on how to get ray to execute f2 on each worker? Do I need to generate a placement group for this purpose? Or is there an even simpler approach (e.g. like parfevalOnAll in Matlab: https://de.mathworks.com/help/parallel-computing/parfevalonall.html)?
Thanks for your help!
Dominik