BaseHorovodWorker:OwnerDiedError

Hello everyone,

Im trying to send a dataset and a few variables over with horovod.RayExecutor and i keep getting this:

error ray::BaseHorovodWorker.execute() (pid=132, ip=10.0.3.38, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7f5b555e1210>)
microservice_distributedtraining.1.lclmxgm94n89@template-maquina-learning-1 | At least one of the input arguments for this task could not be computed:
microservice_distributedtraining.1.lclmxgm94n89@template-maquina-learning-1 | ray.exceptions.OwnerDiedError: Failed to retrieve object 0084dccab86986a08bfb38ae278b2ccc11f06e8e0100000002000000. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during ray start and ray.init().
microservice_distributedtraining.1.lclmxgm94n89@template-maquina-learning-1 |
microservice_distributedtraining.1.lclmxgm94n89@template-maquina-learning-1 | The object’s owner has exited. This is the Python worker that first created the ObjectRef via .remote() or ray.put(). Check cluster logs (/tmp/ray/session_latest/logs/*01000000ffffffffffffffffffffffffffffffffffffffffffffffff* at IP address 172.18.0.3) for more information about the Python worker failure.

I’m using docker swarm environment with my own containers. The same code worked a few times but this keep happening i’d say 8/10 times. Is there any way to prevent this?

If necessary i can provide more info on this. I am available all-day :upside_down_face:

Hey @Camazaqu, you already know I was going to ask :slightly_smiling_face:. Could you provide a reproducible example? And is this error happening on every single run?

@amogkam I manage to track down what i was missing about this error and was badly configured network. Thus i am now having a problem with serializing of a function.

I have a client that make a post request to an rest API with a pre defined model and dataset. I created a function to gather and treat those args and train it using rayExecutor. The function is defined as below:

def train(self, *args, **kwargs):
    import ray
    import tensorflow
    import horovod.tensorflow.keras as hvd
    import numpy
    hvd.init()

    class InstanceTreatment:
        pass

    class ExecutionBackground:
        pass

        def compile(self):
            pass

        def train(self):
            pass

    exe = ExecutionBackground(**kwargs)
    exe.compile()
    return exe.train()

I have also a execution class which runs in separated thread

class Execution:
    __DATASET_KEY_CHARACTER = "$"
    __REMOVE_KEY_CHARACTER = ""

    def __init__(self,
                 database_connector: Database,
                 executor_name: str,
                 executor_service_type: str,
                 parent_name: str,
                 parent_name_service_type: str,
                 metadata_creator: Metadata,
                 class_method: str,
                 parameters_handler: Parameters,
                 storage: ObjectStorage,
                 executor: RayExecutor,
                 compile_code: str = '',
                 ):
        pass

    def __pipeline(self,
                   module_path: str,
                   method_parameters: dict,
                   description: str) -> None:
        try:
            importlib.import_module(module_path)
            print('Starting executor...', flush=True)
            self.distributed_executor.start()
            print('executor ready...', flush=True)
            model_instance = self.__storage.read(self.parent_name,
                                                 self.parent_name_service_type)
            model_definition = model_instance.to_json()
            treated_parameters = self.__parameters_handler.treat(method_parameters)

            method_result = self.distributed_executor.run(train, kwargs=dict({
                'model': model_definition,
                'model_name': self.parent_name,
                'training_parameters': treated_parameters,
                'compile_code': self.compile_code,
                'callbacks': callbacks,
            }))

            print('method_results', method_result, f'\n len: {len(method_result)}', flush=True)
            self.__execute_a_object_method(model_instance, 'set_weights', dict({'weights': method_result[0]}))
            print('saving results to model...', flush=True)
            self.__storage.save(method_result, self.executor_name,
                                self.executor_service_type)
            print('updating flag...', flush=True)
            self.__metadata_creator.update_finished_flag(self.executor_name,
                                                         flag=True)

    def __execute_a_object_method(self, class_instance: object, method: str,
                                  parameters: dict) -> object:
        pass

both functions are defined on a file named distributed_training.py, tho when it try to use the inspect_serializability function i get the following:

==============================================================
Checking Serializability of <function train at 0x7fb3143dadd0>
==============================================================
error ray::BaseHorovodWorker.execute() (pid=163, ip=10.0.2.21, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7fc6489afed0>)
At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named ‘distributed_training’
traceback: Traceback (most recent call last):
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 332, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 235, in _deserialize_object
return self._deserialize_msgpack_data(data, metadata_fields)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 190, in _deserialize_msgpack_data
python_objects = self._deserialize_pickle5_data(pickle5_data)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 180, in _deserialize_pickle5_data
obj = pickle.loads(in_band)
ModuleNotFoundError: No module named ‘distributed_training’

Im running this code on a headnode and the executor was previously created in another file and passed as parameter to the Execution class. Please let me know anything else i can provide