@kai
Regarding PlacementGroup key error above, this example can fully replicates:
#!/usr/bin/env python
# -*- coding:utf-8 _*-
from enum import auto, Enum
import ray
from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager, ResourceManager
from ray.actor import ActorClass
from ray.air import ResourceRequest, AcquiredResources
from typing import Dict, Any, Union, List, Optional, Type
from ray.tune.execution.ray_trial_executor import RayTrialExecutor
import logging
GLOBAL_NS = 'global_vars'
RM_REG = 'global_resource_manager_registrar'
def freeze(o):
if isinstance(o, dict):
return frozenset({k:freeze(v) for k,v in o.items()}.items())
if isinstance(o, (set, tuple, list)):
return tuple([freeze(v) for v in o])
return o
def make_hash(o):
"""
makes a hash out of anything that contains only list,dict and hashable types including string and numeric types
"""
return hash(freeze(o))
RM_ACTOR_CLASSES = {
'fixed': FixedResourceManager,
'pg': PlacementGroupResourceManager,
}
def _get_resource_manager_cls(rm_type: str) -> ActorClass:
if rm_type == 'fixed':
cls = RM_ACTOR_CLASSES[rm_type]
elif rm_type == 'pg':
cls = RM_ACTOR_CLASSES[rm_type]
else:
raise NotImplementedError(f"resource_manager type {rm_type} not yet supported!")
return cls
@ray.remote(num_cpus=0, num_gpus=0)
class GlobalResourceManagerRegistrar:
def __init__(self, *args, **kwargs):
self._rms = {}
logging.basicConfig(level=logging.INFO)
def get_if_exists(self, rm_type: str, rm_init_kwgs: Dict[str, Any]) -> ray.actor.ActorHandle:
key = make_hash((rm_type, rm_init_kwgs))
if key in self._rms:
return self._rms[key]
remote_rm_cls = ray.remote(_get_resource_manager_cls(rm_type))
actor = remote_rm_cls.remote(**rm_init_kwgs) # we want rm to be actor handle
# debug
rtc = ray.get_runtime_context()
logging.debug(f'mode = {rtc.worker.mode}')
self._rms[key] = actor
logging.info(msg=f"missing resource manager of type = {rm_type} && init_kwgs = {rm_init_kwgs}. "
f"created and registered.")
return actor
def _get_global_rm_actor(rm_type: str, rm_init_kwgs: Dict[str, Any]) -> ray.actor.ActorHandle:
try:
rm_reg = ray.get_actor(name=RM_REG, namespace=GLOBAL_NS)
except ValueError:
rm_reg = GlobalResourceManagerRegistrar.options(
namespace=GLOBAL_NS, name=RM_REG,
lifetime='detached',
# get_if_exists=True, # THIS DOESN"T WORK!!!!!!!!!!!!! WTF???
).remote()
return ray.get(rm_reg.get_if_exists.remote(rm_type, rm_init_kwgs))
class GlobalResourceManager(ResourceManager):
def __init__(self,
rm_type: str = None,
rm_init_kwgs: Dict[str, Any] = None,
):
if rm_type is None:
rm_type = 'fixed'
self.rm_type = rm_type
self.rm_init_kwgs = rm_init_kwgs or {}
self._actual_rm = _get_global_rm_actor(self.rm_type, self.rm_init_kwgs)
def __repr__(self):
return repr(self._actual_rm) # a way of sanity checking that we're actually using the same actor handle
def __str__(self):
return self.__repr__()
def __reduce__(self):
return self.__class__, (self.rm_type, self.rm_init_kwgs),
def __getstate__(self):
state = self.__dict__.copy()
del state['_actual_rm']
return state
def request_resources(self, resource_request: ResourceRequest):
ray.get(self._actual_rm.request_resources.remote(resource_request))
def cancel_resource_request(self, resource_request: ResourceRequest):
ray.get(self._actual_rm.cancel_resource_request.remote(resource_request))
def has_resources_ready(self, resource_request: ResourceRequest) -> bool:
return ray.get(self._actual_rm.has_resources_ready.remote(resource_request))
def acquire_resources(self, resource_request: ResourceRequest) -> Optional[AcquiredResources]:
return ray.get(self._actual_rm.acquire_resources.remote(resource_request))
def free_resources(self, acquired_resource: AcquiredResources):
ray.get(self._actual_rm.free_resources.remote(acquired_resource))
def clear(self):
ray.get(self._actual_rm.clear.remote())
class ResourceMgrBasedTrialExecutor(RayTrialExecutor):
# Ref: https://discuss.ray.io/t/shared-resource-manager/9638/2
def __init__(self,
global_resource_manager: GlobalResourceManager,
*,
reuse_actors: bool = False,
result_buffer_length: Optional[int] = None,
refresh_period: Optional[float] = None,
chdir_to_trial_dir: bool = False,
):
super().__init__(
resource_manager=None, # we don't create it under __init__
reuse_actors=reuse_actors,
result_buffer_length=result_buffer_length,
refresh_period=refresh_period,
chdir_to_trial_dir=chdir_to_trial_dir,
)
self.resource_manager_type = global_resource_manager.rm_type
self.resource_manager_init_kwgs = global_resource_manager.rm_init_kwgs
self._global_resource_manager: Optional[GlobalResourceManager] = None
def setup(self, *args, **kwargs):
super().setup(*args, **kwargs)
self._resource_manager = GlobalResourceManager(rm_type=self.resource_manager_type,
rm_init_kwgs=self.resource_manager_init_kwgs)
def __getstate__(self):
state = self.__dict__.copy()
state.pop("_resource_manager", None) # ensure not pickling `resource_manager`
return state
if __name__ == '__main__':
ray.init('auto')
def objective(x, a, b):
return a * (x ** 0.5) + b
from ray import tune
from ray.air import session
from ray import air
def trainable(config: dict):
intermediate_score = 0
for x in range(20):
intermediate_score = objective(x, config["a"], config["b"])
session.report({"score": intermediate_score}) # This sends the score to Tune.
tuner = tune.Tuner(
tune.with_resources(trainable, tune.PlacementGroupFactory([
{"CPU": 1,},
])),
param_space={"a": 2, "b": 4},
run_config=air.RunConfig(name="my_trainable"),
_tuner_kwargs={'trial_executor': ResourceMgrBasedTrialExecutor(global_resource_manager=GlobalResourceManager('pg'))}
)
results = tuner.fit()