Serializing function error

To sum up my problem when i call inspect_serializability the function pass but when i send it via executor.run it fails.

I have a client that make a post request to an rest API with a pre defined model and dataset. I created a function to gather and treat those args and train it using rayExecutor. The function is defined as below:

def train(self, *args, **kwargs):
    import ray
    import tensorflow
    import horovod.tensorflow.keras as hvd
    import numpy
    hvd.init()

    class InstanceTreatment:
        pass

    class ExecutionBackground:
        pass

        def compile(self):
            pass

        def train(self):
            pass

    exe = ExecutionBackground(**kwargs)
    exe.compile()
    return exe.train()

I have also a execution class which runs in separated thread

class Execution:
    __DATASET_KEY_CHARACTER = "$"
    __REMOVE_KEY_CHARACTER = ""

    def __init__(self,
                 database_connector: Database,
                 executor_name: str,
                 executor_service_type: str,
                 parent_name: str,
                 parent_name_service_type: str,
                 metadata_creator: Metadata,
                 class_method: str,
                 parameters_handler: Parameters,
                 storage: ObjectStorage,
                 executor: RayExecutor,
                 compile_code: str = '',
                 ):
        pass

    def __pipeline(self,
                   module_path: str,
                   method_parameters: dict,
                   description: str) -> None:
        try:
            importlib.import_module(module_path)
            print('Starting executor...', flush=True)
            self.distributed_executor.start()
            print('executor ready...', flush=True)
            model_instance = self.__storage.read(self.parent_name,
                                                 self.parent_name_service_type)
            model_definition = model_instance.to_json()
            treated_parameters = self.__parameters_handler.treat(method_parameters)

            method_result = self.distributed_executor.run(train, kwargs=dict({
                'model': model_definition,
                'model_name': self.parent_name,
                'training_parameters': treated_parameters,
                'compile_code': self.compile_code,
                'callbacks': callbacks,
            }))

            print('method_results', method_result, f'\n len: {len(method_result)}', flush=True)
            self.__execute_a_object_method(model_instance, 'set_weights', dict({'weights': method_result[0]}))
            print('saving results to model...', flush=True)
            self.__storage.save(method_result, self.executor_name,
                                self.executor_service_type)
            print('updating flag...', flush=True)
            self.__metadata_creator.update_finished_flag(self.executor_name,
                                                         flag=True)

    def __execute_a_object_method(self, class_instance: object, method: str,
                                  parameters: dict) -> object:
        pass

both functions are defined on a file named distributed_training.py, tho when it try to use the inspect_serializability function i get the following:

==============================================================
Checking Serializability of <function train at 0x7fb3143dadd0>

error ray::BaseHorovodWorker.execute() (pid=163, ip=10.0.2.21, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7fc6489afed0>)
At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named ‘distributed_training’
traceback: Traceback (most recent call last):
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 332, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 235, in _deserialize_object
return self._deserialize_msgpack_data(data, metadata_fields)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 190, in _deserialize_msgpack_data
python_objects = self._deserialize_pickle5_data(pickle5_data)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 180, in _deserialize_pickle5_data
obj = pickle.loads(in_band)
ModuleNotFoundError: No module named ‘distributed_training’

Im running this code on a headnode and the executor was previously created in another file and passed as parameter to the Execution class. Please let me know anything else i can provide

I feel it’s a environmental issue. Could you use runtime env and add working dir which contain distributed_training?