Error in RPC in client mode

The following snippet throws out an error when I tried to run it in client mode. It works fine in the local cluster.

import time 
import numpy as np 
import ray 
import ray.util
import pandas
ray.util.connect('<Host>:<Port>')

@ray.remote 
def no_work(a): 
    return 7

start = time.time() 
a_id = ray.put(pandas.DataFrame([1, 2])) # Works when ray is initialized locally with ray.init
#a_id = ray.put([1, 2])                  # Works both locally and remote
result_ids = [no_work.remote(a_id) for x in range(10)] 
results = ray.get(result_ids) 
print("duration =", time.time() - start) 
results

Here is the error:

Got Error from data channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612316497.146166241","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>
Exception in thread Thread-8:
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 87, in _data_main
    raise e
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 62, in _data_main
    for response in resp_stream:
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612316497.146166241","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>

---------------------------------------------------------------------------
ConnectionError                           Traceback (most recent call last)
<ipython-input-1-82ff813aceed> in <module>
     11 
     12 start = time.time()
---> 13 a_id = ray.put(pandas.DataFrame([1, 2])) # Works when ray is initialized locally with ray.init
     14 #a_id = ray.put([1, 2]) # Works with remote cluster
     15 result_ids = [no_work.remote(a_id) for x in range(10)]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     44         global _client_hook_enabled
     45         if client_mode_enabled and _client_hook_enabled:
---> 46             return getattr(ray, func.__name__)(*args, **kwargs)
     47         return func(*args, **kwargs)
     48 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/api.py in put(self, *args, **kwargs)
     41             kwargs: opaque keyword arguments
     42         """
---> 43         return self.worker.put(*args, **kwargs)
     44 
     45     def wait(self, *args, **kwargs):

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in put(self, vals)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in <listcomp>(.0)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in _put(self, val)
    206         data = dumps_from_client(val, self._client_id)
    207         req = ray_client_pb2.PutRequest(data=data)
--> 208         resp = self.data_client.PutObject(req)
    209         return ClientObjectRef(resp.id)
    210 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in PutObject(self, request, context)
    125                   context=None) -> ray_client_pb2.PutResponse:
    126         datareq = ray_client_pb2.DataRequest(put=request, )
--> 127         resp = self._blocking_send(datareq)
    128         return resp.put
    129 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in _blocking_send(self, req)
    104             if self._in_shutdown:
    105                 raise ConnectionError(
--> 106                     f"cannot send request {req}: data channel shutting down")
    107             data = self.ready_data[req_id]
    108             del self.ready_data[req_id]

ConnectionError: cannot send request req_id: 2
put {
  data: "\200\005\2259\002\000\000\000\000\000\000\214\021pandas.core.frame\224\214\tDataFrame\224\223\224)\201\224}\224(\214\004_mgr\224\214\036pandas.core.internals.managers\224\214\014BlockManager\224\223\224)\201\224(]\224(\214\030pandas.core.indexes.base\224\214\n_new_Index\224\223\224\214\031pandas.core.indexes.range\224\214\nRangeIndex\224\223\224}\224(\214\004name\224N\214\005start\224K\000\214\004stop\224K\001\214\004step\224K\001u\206\224R\224h\rh\020}\224(h\022Nh\023K\000h\024K\002h\025K\001u\206\224R\224e]\224\214\022numpy.core.numeric\224\214\013_frombuffer\224\223\224(\226\020\000\000\000\000\000\000\000\001\000\000\000\000\000\000\000\002\000\000\000\000\000\000\000\224\214\005numpy\224\214\005dtype\224\223\224\214\002i8\224\211\210\207\224R\224(K\003\214\001<\224NNNJ\377\377\377\377J\377\377\377\377K\000t\224bK\001K\002\206\224\214\001C\224t\224R\224a]\224h\rh\020}\224(h\022Nh\023K\000h\024K\001h\025K\001u\206\224R\224a}\224\214\0060.14.1\224}\224(\214\004axes\224h\n\214\006blocks\224]\224}\224(\214\006values\224h+\214\010mgr_locs\224\214\010builtins\224\214\005slice\224\223\224K\000K\001K\001\207\224R\224uaust\224b\214\004_typ\224\214\tdataframe\224\214\t_metadata\224]\224\214\005attrs\224}\224ub."
}
: data channel shutting down

I started the remote server with the following command:

python -m ray.util.client.server --host 0.0.0.0 --port 51005 --redis-address 172.31.31.30:6379 --redis-password 5241590000000000

From the error message, it seems to me that the pandas’ DataFrame is not getting serialized properly.

Please suggest.

cc @barakmich Can you take a look at this issue?

I’ll bet you dollars to donuts this is the same error as Serialization error on Ray 2.0rc with pandas DataFrames · Issue #13882 · ray-project/ray · GitHub – which I happily triaged yesterday :slight_smile:

If it’s not, do let me know

The error went away after switching to pandas==1.1.5. Thanks @barakmich and @devin-petersohn.