The Ray version I’m using is 1.7.0
I am trying using ray from Jupyter notebook, with local ray cluster, the following code works:
ray.init()
data = np.zeros(1 * 1024 * 1024)
data_ref = ray.put(data)
@ray.remote
def f():
return len(ray.get(data_ref))
result_ref = f.remote()
result = ray.get(result_ref)
print(result)
ray.shutdown()
Then I setup a ray cluster with the kubernetes operator, and connect it from Jupyter notebook via ray client:
ray.init("ray://ray-2-ray-head.ray.svc.cluster.local:10001")
data = np.zeros(1 * 1024 * 1024)
data_ref = ray.put(data)
@ray.remote
def f():
return len(ray.get(data_ref))
result_ref = f.remote()
result = ray.get(result_ref)
print(result)
ray.shutdown()
And this gives me very strange error, and the error message is not helpful:
Put failed:
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
/tmp/ipykernel_1341/232507574.py in <module>
10
11
---> 12 result_ref = f.remote()
13 result = ray.get(result_ref)
14 print(result)
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
125 @wraps(function)
126 def _remote_proxy(*args, **kwargs):
--> 127 return self._remote(args=args, kwargs=kwargs)
128
129 self.remote = _remote_proxy
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py in _invocation_remote_span(self, args, kwargs, *_args, **_kwargs)
291 # Tracing doesn't work for cross lang yet.
292 if not is_tracing_enabled() or self._is_cross_language:
--> 293 return method(self, args, kwargs, *_args, **_kwargs)
294
295 assert "_ray_trace_ctx" not in kwargs
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, retry_exceptions, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, override_environment_variables, name)
239 runtime_env=runtime_env,
240 override_environment_variables=override_environment_variables,
--> 241 name=name)
242
243 worker = ray.worker.global_worker
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
143 setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
144 client_func = ray._get_converted(key)
--> 145 return client_func._remote(in_args, in_kwargs, **kwargs)
146
147
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/common.py in _remote(self, args, kwargs, **option_args)
127 if kwargs is None:
128 kwargs = {}
--> 129 return self.options(**option_args).remote(*args, **kwargs)
130
131 def __repr__(self):
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/common.py in remote(self, *args, **kwargs)
377 def remote(self, *args, **kwargs):
378 self._remote_stub._signature.bind(*args, **kwargs)
--> 379 return return_refs(ray.call_remote(self, *args, **kwargs))
380
381 def __getattr__(self, key):
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/api.py in call_remote(self, instance, *args, **kwargs)
100 kwargs: opaque keyword arguments
101 """
--> 102 return self.worker.call_remote(instance, *args, **kwargs)
103
104 def call_release(self, id: bytes) -> None:
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/worker.py in call_remote(self, instance, *args, **kwargs)
452
453 def call_remote(self, instance, *args, **kwargs) -> List[bytes]:
--> 454 task = instance._prepare_client_task()
455 for arg in args:
456 pb_arg = convert_to_arg(arg, self._client_id)
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/common.py in _prepare_client_task(self)
383
384 def _prepare_client_task(self):
--> 385 task = self._remote_stub._prepare_client_task()
386 set_task_options(task, self._options)
387 return task
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/common.py in _prepare_client_task(self)
153
154 def _prepare_client_task(self) -> ray_client_pb2.ClientTask:
--> 155 self._ensure_ref()
156 task = ray_client_pb2.ClientTask()
157 task.type = ray_client_pb2.ClientTask.FUNCTION
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/common.py in _ensure_ref(self)
150 None)
151 self._ref = ray.worker._put_pickled(
--> 152 data, client_ref_id=self._client_side_ref.id)
153
154 def _prepare_client_task(self) -> ray_client_pb2.ClientTask:
/opt/conda/envs/mycode/lib/python3.7/site-packages/ray/util/client/worker.py in _put_pickled(self, data, client_ref_id)
410 if not resp.valid:
411 try:
--> 412 raise cloudpickle.loads(resp.error)
413 except (pickle.UnpicklingError, TypeError):
414 logger.exception("Failed to deserialize {}".format(resp.error))
AssertionError:
But if I send the data through function parameters, then it works:
ray.init("ray://ray-2-ray-head.ray.svc.cluster.local:10001")
data = np.zeros(1 * 1024 * 1024)
# data_ref = ray.put(data)
@ray.remote
def f(d):
return len(d)
result_ref = f.remote(data)
result = ray.get(result_ref)
print(result)
ray.shutdown()
# this gives me correct answer.
Has anybody tried Ray client within Jupyter notebook and object ref?