The following snippet throws out an error when I tried to run it in client mode. It works fine in the local cluster.
import time
import numpy as np
import ray
import ray.util
import pandas
ray.util.connect('<Host>:<Port>')
@ray.remote
def no_work(a):
return 7
start = time.time()
a_id = ray.put(pandas.DataFrame([1, 2])) # Works when ray is initialized locally with ray.init
#a_id = ray.put([1, 2]) # Works both locally and remote
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
results
Here is the error:
Got Error from data channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
debug_error_string = "{"created":"@1612316497.146166241","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>
Exception in thread Thread-8:
Traceback (most recent call last):
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 87, in _data_main
raise e
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 62, in _data_main
for response in resp_stream:
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
debug_error_string = "{"created":"@1612316497.146166241","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>
---------------------------------------------------------------------------
ConnectionError Traceback (most recent call last)
<ipython-input-1-82ff813aceed> in <module>
11
12 start = time.time()
---> 13 a_id = ray.put(pandas.DataFrame([1, 2])) # Works when ray is initialized locally with ray.init
14 #a_id = ray.put([1, 2]) # Works with remote cluster
15 result_ids = [no_work.remote(a_id) for x in range(10)]
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
44 global _client_hook_enabled
45 if client_mode_enabled and _client_hook_enabled:
---> 46 return getattr(ray, func.__name__)(*args, **kwargs)
47 return func(*args, **kwargs)
48
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/api.py in put(self, *args, **kwargs)
41 kwargs: opaque keyword arguments
42 """
---> 43 return self.worker.put(*args, **kwargs)
44
45 def wait(self, *args, **kwargs):
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in put(self, vals)
191 to_put.append(vals)
192
--> 193 out = [self._put(x) for x in to_put]
194 if single:
195 out = out[0]
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in <listcomp>(.0)
191 to_put.append(vals)
192
--> 193 out = [self._put(x) for x in to_put]
194 if single:
195 out = out[0]
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in _put(self, val)
206 data = dumps_from_client(val, self._client_id)
207 req = ray_client_pb2.PutRequest(data=data)
--> 208 resp = self.data_client.PutObject(req)
209 return ClientObjectRef(resp.id)
210
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in PutObject(self, request, context)
125 context=None) -> ray_client_pb2.PutResponse:
126 datareq = ray_client_pb2.DataRequest(put=request, )
--> 127 resp = self._blocking_send(datareq)
128 return resp.put
129
~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in _blocking_send(self, req)
104 if self._in_shutdown:
105 raise ConnectionError(
--> 106 f"cannot send request {req}: data channel shutting down")
107 data = self.ready_data[req_id]
108 del self.ready_data[req_id]
ConnectionError: cannot send request req_id: 2
put {
data: "\200\005\2259\002\000\000\000\000\000\000\214\021pandas.core.frame\224\214\tDataFrame\224\223\224)\201\224}\224(\214\004_mgr\224\214\036pandas.core.internals.managers\224\214\014BlockManager\224\223\224)\201\224(]\224(\214\030pandas.core.indexes.base\224\214\n_new_Index\224\223\224\214\031pandas.core.indexes.range\224\214\nRangeIndex\224\223\224}\224(\214\004name\224N\214\005start\224K\000\214\004stop\224K\001\214\004step\224K\001u\206\224R\224h\rh\020}\224(h\022Nh\023K\000h\024K\002h\025K\001u\206\224R\224e]\224\214\022numpy.core.numeric\224\214\013_frombuffer\224\223\224(\226\020\000\000\000\000\000\000\000\001\000\000\000\000\000\000\000\002\000\000\000\000\000\000\000\224\214\005numpy\224\214\005dtype\224\223\224\214\002i8\224\211\210\207\224R\224(K\003\214\001<\224NNNJ\377\377\377\377J\377\377\377\377K\000t\224bK\001K\002\206\224\214\001C\224t\224R\224a]\224h\rh\020}\224(h\022Nh\023K\000h\024K\001h\025K\001u\206\224R\224a}\224\214\0060.14.1\224}\224(\214\004axes\224h\n\214\006blocks\224]\224}\224(\214\006values\224h+\214\010mgr_locs\224\214\010builtins\224\214\005slice\224\223\224K\000K\001K\001\207\224R\224uaust\224b\214\004_typ\224\214\tdataframe\224\214\t_metadata\224]\224\214\005attrs\224}\224ub."
}
: data channel shutting down
I started the remote server with the following command:
python -m ray.util.client.server --host 0.0.0.0 --port 51005 --redis-address 172.31.31.30:6379 --redis-password 5241590000000000
From the error message, it seems to me that the pandas’ DataFrame is not getting serialized properly.
Please suggest.