Shared Resource Manager

Hi @kai - I think this deserves a post, and it might be relevant to others as well, so I’ll send it here instead of bothering you on slack.

As we communicated in Slack, my goal is to delegate all resource management logics to the latest resource_manager design, which to be used properly, needs to be shared as a ray (detached?) Actor.

This IMO has very practical use case, b/c it’s 100% bottom-up instead of old top-down fashion. Also it gives users the ability to unify all parallelizable jobs into one centralized resource manager.

What would you say the best practice on this?

So far, I’ve tried:

  • Register resource manager as a named & detached Actor
  • Wrap it via a thin SharedResourceManagerWrapper so that the APIs are the same to downstreams

Yet I ran into these issues:

  1. [Ray Core] `PlacementGroupResourceManager.__del__` fails when registered as an Actor · Issue #33048 · ray-project/ray · GitHub
  2. [Ray Tune] `resource_manager` pickling support · Issue #33040 · ray-project/ray · GitHub
  3. Following error passing PlacementGroupResourceManager as ActorHandle to the shared wrapper
RayTaskError(KeyError): ray::PlacementGroupResourceManager.free_resources() (pid=270823, ip=10.19.199.11, repr=<ray.air.execution.resources.placement_group.PlacementGroupResourceManager object at 0x7fd87613ea90>)
  File "XXXXXX/envs/4a2dc35611fbe1c7ebfa224ac7f50c47643f4c04/lib/python3.8/site-packages/ray/air/execution/resources/placement_group.py", line 188, in free_resources
    self._acquired_pgs.remove(pg)
KeyError: <ray.util.placement_group.PlacementGroup object at 0x7fd840655af0>

The PlacementGroupResourceManager seems particularly flaky…

What are your thoughts on comments? Thank you

Hi @Zhang_David,

it’s great that you’re trying out our new features!

For a shared resource manager, the right way in my opinion is to have a thin wrapper, e.g. RemoteResourceManager, that implements the base resource manager interface. This wrapper will then just communicate between the actual resource manager (which can be scheduled as an actor) and the application.

The RemoteResourceManager can then also be pickled and thus shipped around (this will require overwriting the blocked __reduce__ function).

We generally want to prevent pickling of the resource managers as a safeguard. This is because there is no global state of truth that they can refer to.

For instance, if you have a FixedResourceManager and ship it to two remote tasks, and the first remote task then acquires resources from it, the second remote task will not know about it. The FixedResourceManager thinks it still has resources available and will try to allocate them.

Similar for placement groups: If you request a placement group from one resource manager, then ship the manager to a remote task, and then return the placement group in the first task, the second task will think it’s still allocated and in use.

To avoid these pitfalls, it’s better to not allow any pickling and thus usage in remote tasks.

It looks like the KeyError you’re experiencing might be due to such use - do you have a reproducible script for me to look into?

As for the non-pickling preventing the use with Tuner - this is a shortcoming in the current API and internal behavior. The API does not expose setting a resource manager/trial executor, and tries to pickle all Tuner arguments to a Tuner.pkl file.

As a workaround, you can either overwrite the RayTrialExecutor like this:

class CustomExecutor(RayTrialExecutor):
    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self._resource_manager = FixedResourceManager()

    def __getstate__(self):
        state = self.__dict__.copy()
        state.pop("_resource_manager", None)
        return state

and then pass it to the tuner kwargs. This will avoid pickling the resource manager.

Alternatively, you can use the legacy tune.run API to start your tuning job.

To fix this going forward, we should expose a better API to specify resource managers, e.g. via a resource_manager_cls argument. I’m working on refactorings right now, and I’ll make sure to include options for this going forward.

Hope this helps!

Hi @kai - one thing I noticed was that, when I launch shared resource manager that wraps fixed resource manager, I have to specifically pass in total resources.

The reason being that when I do ray job submit XXX, this specifically line returns False: ray/fixed.py at 83c99361c373b3cad0b497648e479ba1abf12bd0 · ray-project/ray · GitHub, b/c ray.get_runtime_context().worker.mode is WORKER_MODE.

Any reason for only accepting these 3 values?

if rtc.worker.mode in {None, SCRIPT_MODE, LOCAL_MODE}: pass

@kai
Regarding PlacementGroup key error above, this example can fully replicates:

#!/usr/bin/env python
# -*- coding:utf-8 _*-
from enum import auto, Enum
import ray
from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager, ResourceManager
from ray.actor import ActorClass
from ray.air import ResourceRequest, AcquiredResources
from typing import Dict, Any, Union, List, Optional, Type
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
import logging

GLOBAL_NS = 'global_vars'
RM_REG = 'global_resource_manager_registrar'


def freeze(o):
    if isinstance(o, dict):
        return frozenset({k:freeze(v) for k,v in o.items()}.items())
    if isinstance(o, (set, tuple, list)):
        return tuple([freeze(v) for v in o])
    return o


def make_hash(o):
    """
    makes a hash out of anything that contains only list,dict and hashable types including string and numeric types
    """
    return hash(freeze(o))


RM_ACTOR_CLASSES = {
    'fixed': FixedResourceManager,
    'pg': PlacementGroupResourceManager,
}


def _get_resource_manager_cls(rm_type: str) -> ActorClass:
    if rm_type == 'fixed':
        cls = RM_ACTOR_CLASSES[rm_type]
    elif rm_type == 'pg':
        cls = RM_ACTOR_CLASSES[rm_type]
    else:
        raise NotImplementedError(f"resource_manager type {rm_type} not yet supported!")
    return cls


@ray.remote(num_cpus=0, num_gpus=0)
class GlobalResourceManagerRegistrar:

    def __init__(self, *args, **kwargs):
        self._rms = {}
        logging.basicConfig(level=logging.INFO)

    def get_if_exists(self, rm_type: str, rm_init_kwgs: Dict[str, Any]) -> ray.actor.ActorHandle:
        key = make_hash((rm_type, rm_init_kwgs))
        if key in self._rms:
            return self._rms[key]
        remote_rm_cls = ray.remote(_get_resource_manager_cls(rm_type))
        actor = remote_rm_cls.remote(**rm_init_kwgs)  # we want rm to be actor handle
        # debug
        rtc = ray.get_runtime_context()
        logging.debug(f'mode = {rtc.worker.mode}')
        self._rms[key] = actor
        logging.info(msg=f"missing resource manager of type = {rm_type} && init_kwgs = {rm_init_kwgs}. "
                         f"created and registered.")
        return actor


def _get_global_rm_actor(rm_type: str, rm_init_kwgs: Dict[str, Any]) -> ray.actor.ActorHandle:
    try:
        rm_reg = ray.get_actor(name=RM_REG, namespace=GLOBAL_NS)
    except ValueError:
        rm_reg = GlobalResourceManagerRegistrar.options(
            namespace=GLOBAL_NS, name=RM_REG,
            lifetime='detached',
            # get_if_exists=True,               # THIS DOESN"T WORK!!!!!!!!!!!!! WTF???
        ).remote()
    return ray.get(rm_reg.get_if_exists.remote(rm_type, rm_init_kwgs))


class GlobalResourceManager(ResourceManager):

    def __init__(self,
                 rm_type: str = None,
                 rm_init_kwgs: Dict[str, Any] = None,
                 ):
        if rm_type is None:
            rm_type = 'fixed'
        self.rm_type = rm_type
        self.rm_init_kwgs = rm_init_kwgs or {}

        self._actual_rm = _get_global_rm_actor(self.rm_type, self.rm_init_kwgs)

    def __repr__(self):
        return repr(self._actual_rm)  # a way of sanity checking that we're actually using the same actor handle

    def __str__(self):
        return self.__repr__()

    def __reduce__(self):
        return self.__class__, (self.rm_type, self.rm_init_kwgs),

    def __getstate__(self):
        state = self.__dict__.copy()
        del state['_actual_rm']
        return state

    def request_resources(self, resource_request: ResourceRequest):
        ray.get(self._actual_rm.request_resources.remote(resource_request))

    def cancel_resource_request(self, resource_request: ResourceRequest):
        ray.get(self._actual_rm.cancel_resource_request.remote(resource_request))

    def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
        return ray.get(self._actual_rm.has_resources_ready.remote(resource_request))

    def acquire_resources(self, resource_request: ResourceRequest) -> Optional[AcquiredResources]:
        return ray.get(self._actual_rm.acquire_resources.remote(resource_request))

    def free_resources(self, acquired_resource: AcquiredResources):
        ray.get(self._actual_rm.free_resources.remote(acquired_resource))

    def clear(self):
        ray.get(self._actual_rm.clear.remote())


class ResourceMgrBasedTrialExecutor(RayTrialExecutor):
    # Ref: https://discuss.ray.io/t/shared-resource-manager/9638/2

    def __init__(self,
                 global_resource_manager: GlobalResourceManager,
                 *,
                 reuse_actors: bool = False,
                 result_buffer_length: Optional[int] = None,
                 refresh_period: Optional[float] = None,
                 chdir_to_trial_dir: bool = False,
                 ):
        super().__init__(
            resource_manager=None,  # we don't create it under __init__
            reuse_actors=reuse_actors,
            result_buffer_length=result_buffer_length,
            refresh_period=refresh_period,
            chdir_to_trial_dir=chdir_to_trial_dir,
        )
        self.resource_manager_type = global_resource_manager.rm_type
        self.resource_manager_init_kwgs = global_resource_manager.rm_init_kwgs

        self._global_resource_manager: Optional[GlobalResourceManager] = None

    def setup(self, *args, **kwargs):
        super().setup(*args, **kwargs)
        self._resource_manager = GlobalResourceManager(rm_type=self.resource_manager_type,
                                                       rm_init_kwgs=self.resource_manager_init_kwgs)

    def __getstate__(self):
        state = self.__dict__.copy()
        state.pop("_resource_manager", None)  # ensure not pickling `resource_manager`
        return state


if __name__ == '__main__':
    ray.init('auto')

    def objective(x, a, b):
        return a * (x ** 0.5) + b

    from ray import tune
    from ray.air import session
    from ray import air


    def trainable(config: dict):
        intermediate_score = 0
        for x in range(20):
            intermediate_score = objective(x, config["a"], config["b"])
            session.report({"score": intermediate_score})  # This sends the score to Tune.


    tuner = tune.Tuner(
         tune.with_resources(trainable, tune.PlacementGroupFactory([
             {"CPU": 1,},
         ])),
        param_space={"a": 2, "b": 4},
         run_config=air.RunConfig(name="my_trainable"),
        _tuner_kwargs={'trial_executor': ResourceMgrBasedTrialExecutor(global_resource_manager=GlobalResourceManager('pg'))}
     )

    results = tuner.fit()


I assume you can’t assume you have access to the entire cluster if you are in other modes.
Multiple Ray jobs may be sharing the same cluster.

I am curious, are you creating this global resource manager for your own application, or are you trying to change how Ray Tune behaves by default?

Thanks for the thoughtful questions.

I assume you can’t assume you have access to the entire cluster if you are in other modes

This I actually don’t know how differerent “mode” works. Do you know if there’s any documentation about that behavior?

I am curious, are you creating this global resource manager for your own application, or are you trying to change how Ray Tune behaves by default?

In general, I’m trying to delegate the resource allocation logic of all parallelization to the resource manager. A reasonable example is, for example, running k-fold CV per trial. Yes - you could prespecify resource requirements via placement groups but this is just a very simple use case, and the logic eventually extends to the fact that this should be solved bottom-up not top-down, which is what resource manager could achieve, not current design.

For the KeyError you are encountering: This is due to a bug in Ray core. When sending a placement group via the object store (e.g. in a remote call), it gets dereferenced into different objects. Thus, the actor can’t find the placement group in the set, even though both placement groups have the same ID and refer to the same underlying placement group.

I’ve filed a fix here: [core] Compare placement group objects by ID by krfricke · Pull Request #33329 · ray-project/ray · GitHub

Ideally we wait until the fix is landed and you can then iterate on the latest master version.

Got it - this makes sense. Really appreciate the help!

The main use case here is to use the FixedResourceManager in a nested task. For instance, on the outer layer you’ll use Placement groups. In the inner layer, each fixed resource manager should then manage the resources available to itself, i.e. the resources assigned to it in the placement group.

This is why we get the available_resources() in Script/local mode (i.e. when running on the driver) and the assigned_resources() otherwise.

It is a bit unfortunate that jobs trigger WORKER_MODE, but the workaround (passing the resources as an argument) should be easy enough I hope!

The main use case here is to use the FixedResourceManager in a nested task. For instance, on the outer layer you’ll use Placement groups. In the inner layer, each fixed resource manager should then manage the resources available to itself, i.e. the resources assigned to it in the placement group.

This is why we get the available_resources() in Script/local mode (i.e. when running on the driver) and the assigned_resources() otherwise.

Thanks! This makes a lot of sense.

@Zhang_David You sorted now? May I mark this as resolved?

Go for it. Sorry for my late response.