I’m reading data from cassandra. I added @ray.remote decorator. I also to invoke the remote function, I used the remote method.
Reproduction script
@ray.remote
def read_cassandra(assetx,start_date,end_date):
query = “SELECT * from table_name where sensor_id=”+str(assetx)+" and sensor_time>=‘“+str(start_date)+”‘and sensor_time<=’“+str(end_date)+”’ ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(list(rows))
print(df)
x = [read_cassandra.remote(asset,start_date,end_date) for asset in assets]
start_time = time.time()
ray.get(x)
print(“time=”+str(time.time()-start_time))
I created session object inside read_cassandra but I got another error.
Reproduction script
start_date = "2021-02-28"
end_date = "2021-03-26"
daily_date = "2021-03-25"
@ray.remote
def read_cassandra(assetx,start_date,end_date):
auth_provider = PlainTextAuthProvider(username='x', password='x')
cluster = Cluster(['x', 'x', 'x'], auth_provider=auth_provider)
session = cluster.connect('keyspace')
query = “SELECT * from table_name where sensor_id=”+str(assetx)+" and sensor_time>=‘“+str(start_date)+”‘and sensor_time<=’“+str(end_date)+”’ ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame((rows))
print(df)
x = [read_cassandra.remote(asset,start_date,end_date) for asset in assets]
start_time = time.time()
ray.get(x)
I am getting this error:
RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.7/site-packages/ray/exceptions.py", line 32, in from_bytes
return pickle.loads(ray_exception.serialized_exception)
File "cassandra/protocol.py", line 123, in cassandra.protocol.ErrorMessage.__init__
TypeError: __init__() takes exactly 4 positional arguments (1 given)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.7/site-packages/ray/serialization.py", line 287, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File "/opt/anaconda3/lib/python3.7/site-packages/ray/serialization.py", line 217, in _deserialize_object
return RayError.from_bytes(obj)
File "/opt/anaconda3/lib/python3.7/site-packages/ray/exceptions.py", line 35, in from_bytes
raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception