Hello,
I am using ray library for parallelization. my code is as below.
import time
import ray
from elasticsearch import Elasticsearch
import pandas as pd
import copy
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from xgboost_ray import RayXGBClassifier, RayParams
ray.init()
@ray.remote
class anoDetecPro:
def __init__(self):
self.dict = dict()
self.mainObj = list()
self.allTime = time.time()
def connectionES(self):
es = Elasticsearch(hosts=["*********"])
query = {
"query": {
"bool": {
"must": [
{"match": {"query.field.keyword": self.field}},
{"match": {"query.key.keyword": self.key}}
]
}}}
self.log = es.search(index=self.ind, body=query, size=10000)
self.dataPro()
def dataPro(self):
print("datapro")
for i in self.log['hits']['hits']:
data = i['_source']['query']['records']
for j in data:
self.dict['value'] = j['value']
self.dict['class'] = j['class']
self.mainObj.append(copy.copy(self.dict))
self.df = pd.DataFrame(self.mainObj)
self.testTrainData()
def testTrainData(self):
x = self.df["value"].values.reshape(-1, 1)
y = self.df["class"]
seed = 123
X_train, X_test, y_train, y_test = train_test_split(x, y, train_size=0.30, random_state=123)
clf = RayXGBClassifier(
n_jobs=1,
random_state=seed
)
clf.fit(X_train, y_train, ray_params=RayParams(num_actors=2))
self.result = clf.predict(self.data, ray_params=RayParams(num_actors=1))
self.y_test_pred = clf.predict(X_test, ray_params=RayParams(num_actors=1))
def run(self, field, key, ind, data, api_url, name):
self.field = field
self.key = key
self.ind = ind
self.data = data
self.apiurl = api_url
self.name = name
self.connectionES()
return self.result
I import this file in a different project and run it as
predict = ray.get(module.run.remote(fieldName, key, ind, data,
self.apiurl, self.name))
But I am getting this error
TypeError: init() missing 2 required positional arguments: ‘meta’ and ‘body’
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File “/usr/local/lib/python3.8/dist-packages/ray/serialization.py”, line 332, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File “/usr/local/lib/python3.8/dist-packages/ray/serialization.py”, line 258, in _deserialize_object
return RayError.from_bytes(obj)
File “/usr/local/lib/python3.8/dist-packages/ray/exceptions.py”, line 32, in from_bytes
return RayError.from_ray_exception(ray_exception)
File “/usr/local/lib/python3.8/dist-packages/ray/exceptions.py”, line 41, in from_ray_exception
raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception
Do you know what should i do?