@amogkam I manage to track down what i was missing about this error and was badly configured network. Thus i am now having a problem with serializing of a function.
I have a client that make a post request to an rest API with a pre defined model and dataset. I created a function to gather and treat those args and train it using rayExecutor. The function is defined as below:
def train(self, *args, **kwargs):
import ray
import tensorflow
import horovod.tensorflow.keras as hvd
import numpy
hvd.init()
class InstanceTreatment:
pass
class ExecutionBackground:
pass
def compile(self):
pass
def train(self):
pass
exe = ExecutionBackground(**kwargs)
exe.compile()
return exe.train()
I have also a execution class which runs in separated thread
class Execution:
__DATASET_KEY_CHARACTER = "$"
__REMOVE_KEY_CHARACTER = ""
def __init__(self,
database_connector: Database,
executor_name: str,
executor_service_type: str,
parent_name: str,
parent_name_service_type: str,
metadata_creator: Metadata,
class_method: str,
parameters_handler: Parameters,
storage: ObjectStorage,
executor: RayExecutor,
compile_code: str = '',
):
pass
def __pipeline(self,
module_path: str,
method_parameters: dict,
description: str) -> None:
try:
importlib.import_module(module_path)
print('Starting executor...', flush=True)
self.distributed_executor.start()
print('executor ready...', flush=True)
model_instance = self.__storage.read(self.parent_name,
self.parent_name_service_type)
model_definition = model_instance.to_json()
treated_parameters = self.__parameters_handler.treat(method_parameters)
method_result = self.distributed_executor.run(train, kwargs=dict({
'model': model_definition,
'model_name': self.parent_name,
'training_parameters': treated_parameters,
'compile_code': self.compile_code,
'callbacks': callbacks,
}))
print('method_results', method_result, f'\n len: {len(method_result)}', flush=True)
self.__execute_a_object_method(model_instance, 'set_weights', dict({'weights': method_result[0]}))
print('saving results to model...', flush=True)
self.__storage.save(method_result, self.executor_name,
self.executor_service_type)
print('updating flag...', flush=True)
self.__metadata_creator.update_finished_flag(self.executor_name,
flag=True)
def __execute_a_object_method(self, class_instance: object, method: str,
parameters: dict) -> object:
pass
both functions are defined on a file named distributed_training.py, tho when it try to use the inspect_serializability function i get the following:
==============================================================
Checking Serializability of <function train at 0x7fb3143dadd0>
==============================================================
error ray::BaseHorovodWorker.execute() (pid=163, ip=10.0.2.21, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x7fc6489afed0>)
At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named ‘distributed_training’
traceback: Traceback (most recent call last):
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 332, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 235, in _deserialize_object
return self._deserialize_msgpack_data(data, metadata_fields)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 190, in _deserialize_msgpack_data
python_objects = self._deserialize_pickle5_data(pickle5_data)
File “/home/ray/anaconda3/lib/python3.7/site-packages/ray/serialization.py”, line 180, in _deserialize_pickle5_data
obj = pickle.loads(in_band)
ModuleNotFoundError: No module named ‘distributed_training’
Im running this code on a headnode and the executor was previously created in another file and passed as parameter to the Execution class. Please let me know anything else i can provide