RuntimeError: The actor with name TrainTrainable failed to import on the worker

Hello. I’m trying to add a custom training backend, similar to how Ray integrates PyTorch. The issue is, when I define a network model (which is different from PyTorch’s nn.Module), is train_loop_per_worker(), it has the following error message:

File “python/ray/_raylet.pyx”, line 655, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 696, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 662, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 666, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 613, in ray._raylet.execute_task.function_executor
File “/home/wjeon/ray/python/ray/_private/function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “/home/wjeon/ray/python/ray/_private/function_manager.py”, line 586, in temporary_actor_method
raise RuntimeError(
RuntimeError: The actor with name TrainTrainable failed to import on the worker. This may be because needed library dependencies are not installed in the worker environment:

Traceback (most recent call last):
File “/home/wjeon/ray/python/ray/_private/function_manager.py”, line 625, in _load_actor_class_from_gcs
actor_class = pickle.loads(pickled_class)
File “/home/wjeon/ray/python/ray/cloudpickle/cloudpickle.py”, line 872, in _make_skeleton_class
skeleton_class = types.new_class(
File “/usr/lib/python3.8/types.py”, line 72, in new_class
meta, ns, kwds = prepare_class(name, resolved_bases, kwds)
File “/usr/lib/python3.8/types.py”, line 125, in prepare_class
meta = _calculate_meta(meta, bases)
File “/usr/lib/python3.8/types.py”, line 143, in _calculate_meta
raise TypeError("metaclass conflict: "
TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases

Could you guide me how to tackle this issue? If my custom object for the network is implemented in C++ and provided to Python via pybind11, do I need to take care of native libraries accordingly, other than just importing the module of the corresponding Python objects?
Thanks.

Hi,
I don’t really have any experience with this. But my hunch is for this to work, you need to basically make sure that you can pickle the function and ship it to a remote actor and unpickle it successfully.
To test that directly and debug, you can try ray.cloudpickle.dumps(f) and ray.cloudpickle.loads() on your function and see if it works.

cc @sangcho

@xwjiang2010 Thanks for your response. The detailed error message from the beginning:

Exception has occurred: TrainingFailedError
Failure # 1 (occurred at 2022-11-08_11-58-53)
Traceback (most recent call last):
File “/home/wjeon/ray/python/ray/tune/execution/ray_trial_executor.py”, line 996, in get_next_executor_event
future_result = ray.get(ready_future)
File “/home/wjeon/ray/python/ray/_private/client_mode_hook.py”, line 105, in wrapper
return func(*args, **kwargs)
File “/home/wjeon/ray/python/ray/_private/worker.py”, line 2282, in get
raise value
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
class_name: BaseTrainer.as_trainable..TrainTrainable
actor_id: 89986ce05f4d7d344c68da4c01000000
pid: 19275
namespace: 41c04ed9-1bdb-464c-93bf-e12649db5c7f
ip:
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker exits unexpectedly. Worker exits with an exit code None.
Traceback (most recent call last):
File “python/ray/_raylet.pyx”, line 655, in ray._raylet.execute_task
with core_worker.profile_event(b"task:execute"):
File “python/ray/_raylet.pyx”, line 696, in ray._raylet.execute_task
raise e
File “python/ray/_raylet.pyx”, line 662, in ray._raylet.execute_task
with ray._private.worker._changeproctitle(title, next_title):
File “python/ray/_raylet.pyx”, line 666, in ray._raylet.execute_task
outputs = function_executor(*args, **kwargs)
File “python/ray/_raylet.pyx”, line 613, in ray._raylet.execute_task.function_executor
return function(actor, *arguments, **kwarguments)
File “/home/wjeon/ray/python/ray/_private/function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “/home/wjeon/ray/python/ray/_private/function_manager.py”, line 586, in temporary_actor_method
raise RuntimeError(
RuntimeError: The actor with name TrainTrainable failed to import on the worker. This may be because needed library dependencies are not installed in the worker environment:

Is there any way to figure out why the actor with name TrainTrainable failed to import on the worker? The only difference from PyTorch’s training example is that the PyTorch’s network model is inherited from torch.nn.Module class (of ‘type’ class type) whereas my custom test code uses a network class inherited from a base class of ‘pybind11_builtins.pybind11_type’ type. Thank you.

Yeah, I think this is saying pickle is not working as intended. You probably have to dig into how to make it work with a C++ impl + python binding.
Just a random search on the web, I find this. As stated, Pygloo provides python bindings for gloo which is a collective communication library implemented in C++. And as you can see in their example, pygloo.xxx can be put inside a remote function (decorated with ray.remote) without a problem. This means pickling works for pygloo’s case. I would suggest you taking a look there and see how that is done.

Is there a minimal code example that will reproduce the failure? Its best to file a bug so the team can address accordingly : Sign in to GitHub · GitHub

Dear @ClarenceNg , thanks for your response.

Here’s my bare minimum test code. I’m testing with Ray 2.0.1.

from typing import TYPE_CHECKING, Callable, Dict, Optional, Union
from ray.air.checkpoint import Checkpoint
from ray.air.config import DatasetConfig, RunConfig, ScalingConfig
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.train.trainer import GenDataset
from ray.util import PublicAPI

if TYPE_CHECKING:
    from ray.data.preprocessor import Preprocessor

import ray
from ray.air.config import ScalingConfig

import io
from ray.train.backend import BackendConfig, Backend, EncodedData
from ray.train._internal.worker_group import WorkerGroup

import mindspore

class MindSporeConfig(BackendConfig):
    backend: Optional[str] = None
    init_method: str = "env"
    timeout_s: int = 1800

    @property
    def backend_cls(self):
        return _MindSporeBackend


def _shutdown_mindspore(destroy_process_group=False):
    print("_shutdown_mindspore")


class _MindSporeBackend(Backend):
    share_cuda_visible_devices: bool = True

    def on_start(self, worker_group: WorkerGroup, backend_config: MindSporeConfig):
        print("_MindSporeBackend::on_start")

    def on_shutdown(elf, worker_group: WorkerGroup, backend_config: MindSporeConfig):
        worker_group.execute(
            _shutdown_mindspore, destroy_process_group=len(worker_group) > 1
        )

    @staticmethod
    def encode_data(data_dict: Dict) -> EncodedData:
        for k, v in data_dict.items():
            data_dict[k] = v.module

        _buffer = io.BytesIO()
        #mindspore.save_checkpoint(data_dict, _buffer)
        return _buffer.getvalue()

    @staticmethod
    def decode_data(encoded_data: EncodedData) -> Dict:
        _buffer = io.BytesIO(encoded_data)
        checkpoint_dict = mindspore.load(_buffer, map_location="cpu")
        return checkpoint_dict


class MindSporeTrainer(DataParallelTrainer):
    def __init__(
        self,
        train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]],
        *,
        train_loop_config: Optional[Dict] = None,
        mindspore_config: Optional[MindSporeConfig] = None,
        scaling_config: Optional[ScalingConfig] = None,
        dataset_config: Optional[Dict[str, DatasetConfig]] = None,
        run_config: Optional[RunConfig] = None,
        datasets: Optional[Dict[str, GenDataset]] = None,
        preprocessor: Optional["Preprocessor"] = None,
        resume_from_checkpoint: Optional[Checkpoint] = None,
    ):
        if not mindspore_config:
            mindspore_config = MindSporeConfig()

        super(MindSporeTrainer, self).__init__(
            train_loop_per_worker=train_loop_per_worker,
            train_loop_config=train_loop_config,
            backend_config=mindspore_config,
            scaling_config=scaling_config,
            dataset_config=dataset_config,
            run_config=run_config,
            datasets=datasets,
            preprocessor=preprocessor,
            resume_from_checkpoint=resume_from_checkpoint,
        )

import mindspore.nn as nn

class NeuralNetwork(nn.Cell):
    def __init__(self):
        super().__init__()
    def construct(self, x):
        return self

scaling_config = ScalingConfig(num_workers=1)

def fn():
    model = NeuralNetwork()

# test pickling for nn.Cell
bin = ray.cloudpickle.dumps(fn)
ray.cloudpickle.loads(bin)

trainer = MindSporeTrainer(train_loop_per_worker=fn,
    scaling_config=scaling_config,
)
result = trainer.fit()

MindSpore was obtained from:

pip install https://ms-release.obs.cn-north-4.myhuaweicloud.com/1.9.0/MindSpore/gpu/x86_64/cuda-11.1/mindspore_gpu-1.9.0-cp38-cp38-linux_x86_64.whl --trusted-host ms-release.obs.cn-north-4.myhuaweicloud.com -i https://pypi.tuna.tsinghua.edu.cn/simple

Dear @xwjiang2010 Thanks for your suggestion. Will take a look at how pygloo is integrated to Ray. Also, I’ve tested pickling the object by ‘bin = ray.cloudpicke.dumps(fn)’ and ‘ray.cloudpickle.loads(bin)’ and the code does not give any error. Please see the test code above.

@ClarenceNg Is there any update or progress from your side regarding this issue? Thanks.