How can I run a Python function in each worker's interpreter as it joins?

I’ve looked at the discussion here, but it assumes all your nodes have joined the cluster when you run the tasks. I might be wrong, but it looks like it wouldn’t work for autoscaling or pre-emptible nodes that come and go.

What I’d really like is a hook for running a Python function in the interpreter of each worker process as it joins. This would prevent hacks like passing every remote function through a wrapper that (re-)initializes logging. My code also takes a rather long time just to import (thanks, Numba precompilation…) so I’d like to initialize logging and give the workers a head start on importing the stuff they need.

Is there any way to accomplish this with Ray currently, or on the roadmap? Basically, I’m looking for something analogous to worker plugins in Dask, but if there’s another way to accomplish this I’m all ears.

Hey @joseph-long, thanks for posting the question!

We have an issue in the backlog tracking adding support for this to runtime_env. The details are light there, but you would be able to provide a “setup_hook” in runtime_env that would run when the worker starts up. Ex:

ray.init(runtime_env={"setup_hook": "my_module.do_imports_before_ray"})

Would this satisfy your use case? We should be able to prioritize it in the next few weeks (to get in by the 1.10 release).

Actually, I think we can do something even more lightweight and just have a well-known environment variable for this (that can be set via runtime_env):

ray.init(runtime_env={"env_vars": {"RAY_WORKER_SETUP_HOOK": "my_module.do_imports_before_ray"}})

This would have the advantage that you could also set it globally at the cluster level as well as per-worker.

Nice! So if I’m understanding this code correctly, the existing published v1.8.0 has this feature, it’s just not yet documented?

Yup that’s right, I don’t think it’s intended to be a public API but we can stabilize it. Please feel free to try this out (the code sample above should work) and let me know if it works for your use case.

It looks like the environment variable name in the above example should be RAY_USER_SETUP_FUNCTION and not RAY_WORKER_SETUP_HOOK, but otherwise this appears to be what I need!

ray_init_kwargs = {'runtime_env': {'env_vars': {
    'RAY_USER_SETUP_FUNCTION': 'my_module.init_worker_function'
}}}
ray.init(url, **ray_init_kwargs)