Tune.run() on cluster failing with "'Worker' object has no attribute 'core_worker'"

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am working on a hyperparameter tuning problem, and have been able to use ray.tune from my local machine with default ray.init(). However, when I try to connect to a ray cluster we have set up it fails with the stack trace below. I am able to use the cluster on a simple ray demo, without using ray.tune. The problem only appears when I call tune.run() while connected to the cluster. In case it’s relevant, I’m running this from a jupyter notebook. Any ideas?

Here is a code example that fails, with the code somewhat modified to obscure internal details:

import sys, ray
from ray import tune
print(f'Python version (want 3.8.5): {sys.version}')
print(f'Ray version (want 1.11): {ray.__version__}')


def trainable(config):
    model = generate_model_from_config(config)
    return calculate_objective(model)

num_samples = 500
analysis[num_samples] = tune.run(


Here is the full output:

Python version (want 3.8.5): 3.8.5 (default, Apr 26 2022, 13:49:59) 
[GCC 11.2.0]
Ray version (want 1.11): 1.11.0
AttributeError                            Traceback (most recent call last)
<timed exec> in <module>

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/tune/tune.py in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, trial_executor, raise_on_failed_trial, callbacks, max_concurrent_trials, queue_trials, loggers, _remote)
    358                 execute_item = get_next_queue_item()
--> 360         remote_future = remote_run.remote(_remote=False, **remote_run_kwargs)
    362         # ray.wait(...)[1] returns futures that are not ready, yet

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/remote_function.py in remote(self, *args, **kwargs)
    199         class FuncWrapper:
    200             def remote(self, *args, **kwargs):
--> 201                 return func_cls._remote(
    202                     args=args,
    203                     kwargs=kwargs,

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py in _invocation_remote_span(self, args, kwargs, *_args, **_kwargs)
    293             if kwargs is not None:
    294                 assert "_ray_trace_ctx" not in kwargs
--> 295             return method(self, args, kwargs, *_args, **_kwargs)
    297         assert "_ray_trace_ctx" not in kwargs

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, retry_exceptions, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, name, scheduling_strategy)
    244         if client_mode_should_convert(auto_init=True):
--> 245             return client_mode_convert_function(
    246                 self,
    247                 args,

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/_private/client_mode_hook.py in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
    171         setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
    172     client_func = ray._get_converted(key)
--> 173     return client_func._remote(in_args, in_kwargs, **kwargs)

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/common.py in _remote(self, args, kwargs, **option_args)
    128         if kwargs is None:
    129             kwargs = {}
--> 130         return self.options(**option_args).remote(*args, **kwargs)
    132     def __repr__(self):

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/common.py in remote(self, *args, **kwargs)
    378     def remote(self, *args, **kwargs):
    379         self._remote_stub._signature.bind(*args, **kwargs)
--> 380         return return_refs(ray.call_remote(self, *args, **kwargs))
    382     def __getattr__(self, key):

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/api.py in call_remote(self, instance, *args, **kwargs)
    104             kwargs: opaque keyword arguments
    105         """
--> 106         return self.worker.call_remote(instance, *args, **kwargs)
    108     def call_release(self, id: bytes) -> None:

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/worker.py in call_remote(self, instance, *args, **kwargs)
    452             task.args.append(pb_arg)
    453         for k, v in kwargs.items():
--> 454             task.kwargs[k].CopyFrom(convert_to_arg(v, self._client_id))
    455         return self._call_schedule_for_task(task, instance._num_returns())

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/client_pickler.py in convert_to_arg(val, client_id)
    177     out = ray_client_pb2.Arg()
    178     out.local = ray_client_pb2.Arg.Locality.INTERNED
--> 179     out.data = dumps_from_client(val, client_id)
    180     return out

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/util/client/client_pickler.py in dumps_from_client(obj, client_id, protocol)
    157     with io.BytesIO() as file:
    158         cp = ClientPickler(client_id, file, protocol=protocol)
--> 159         cp.dump(obj)
    160         return file.getvalue()

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    618     def dump(self, obj):
    619         try:
--> 620             return Pickler.dump(self, obj)
    621         except RuntimeError as e:
    622             if "recursion" in e.args[0]:

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/serialization.py in object_ref_reducer(obj)
     94         def object_ref_reducer(obj):
---> 95             self.add_contained_object_ref(obj)
     96             worker = ray.worker.global_worker
     97             worker.check_connected()

~/proj/hp_tune/venv3.8.5/lib/python3.8/site-packages/ray/serialization.py in add_contained_object_ref(self, object_ref)
    153             # then pin the object for the lifetime of this worker by adding
    154             # a local reference that won't ever be removed.
--> 155             ray.worker.global_worker.core_worker.add_object_ref_reference(
    156                 object_ref)

AttributeError: 'Worker' object has no attribute 'core_worker'

Hey @jesse,

The first thing I can think of is that the Cluster might be on a different Ray version. Could you try checking that?

You should be able to run this with something like:

def get_version():
    return ray.__version__

print(f'Cluster Ray version: {ray.get(get_version.remote())}')

Thanks for the response! I tried that, but the cluster version is the same:

Python version (want 3.8.5): 3.8.5 (default, Apr 26 2022, 13:49:59) 
[GCC 11.2.0]
Ray version (want 1.11): 1.11.0
Cluster Ray version: 1.11.0

Oh, could you try removing the @ray.remote annotation from the trainable definition?

I get the same error without @ray.remote on the trainable.

Quick update: I think this is an issue with my specific trainable, and not with ray. I am able to use the cluster with a fake trainable coded like this:

def fake_trainable(config):
    return {objective_key: np.random.rand()}

My guess is that I have some missing library dependency that makes the worker crash when it tries to unpickle my trainable.

Ah, nice find. Yeah that’s definitely possible, it’s unfortunate that the error message is pretty obscure… In general I’d recommend adding in more of the real code until it breaks to isolate the root cause. Let me know if there’s anything I can do to help debug!