Execute function on each worker process in the cluster

Hello,

I need to execute a function exactly once on each worker process in a ray cluster. My situation is the following: Some computation tasks are created and while they are running I need to create some additional tasks that simply perform some housekeeping on the worker processes. The housekeeping tasks should execute exactly once on each of the worker processes. Now I don’t know how to properly schedule these tasks such that they are executed on every worker (and not e.g. twice on the same worker). I tried to use scheduling strategy “SPREAD” but the documentation says it will spread tasks among nodes (and not workers):

import ray
import psutil
import os

from random import random
from time import sleep

# This function does some computations
@ray.remote
def f1():
    sleep(random()*5)

# This function does some housekeeping and needs
# to be executed on each worker once
@ray.remote(scheduling_strategy='SPREAD')
def f2():
    return os.getpid()

nCPUs = psutil.cpu_count(logical=False)
ray.init(num_cpus=nCPUs)
# Computation tasks
[f1.remote() for _ in range(20)]
# Housekeeping tasks
futures = [f2.remote() for _ in range(nCPUs)]
# Should print nCPUs unique PIDs
print(ray.get(futures))

The output of this code is for example:

[72532, 8980, 72532, 8980]

So f2 ran four times (the number of CPUs) but unfortunatly twice on the same worker processes.
Can anybody help me on how to get ray to execute f2 on each worker? Do I need to generate a placement group for this purpose? Or is there an even simpler approach (e.g. like parfevalOnAll in Matlab: https://de.mathworks.com/help/parallel-computing/parfevalonall.html)?

Thanks for your help!
Dominik

Would this API help (see a second tab)?

https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#customizing-worker-process-loggers

# driver.py
def logging_setup_func():
    logger = logging.getLogger("ray")
    logger.setLevel(logging.DEBUG)
    warnings.simplefilter("always")

ray.init(runtime_env={"worker_process_setup_hook": logging_setup_func})

logging_setup_func()

This will run the function on every worker when it starts (before scheudling any task)

Hi sangcho,

thanks for the reply! As far as I understand this will run the function only when a worker is started (which is good to know that this feature exists via the runtime_env argument!). However, I need to execute the function on all workers after they are started and they have already completed some tasks. So exactly like in the example I have provided above: f2 needs to execute on every worker once. Any other ideas?

Thanks!

In this case, using actors is a probably a better idea (there’s no feature to run a task to every worker in Ray now).

Ok, thanks for the info! Would be a nice extension to e.g. have a scheduling strategy to evenly distribute tasks among all workers. Or even have a simple function to run something on all workers. But I understand that this feature does not exist for tasks right now.

Thanks again!