ConnectionError: Error 10054 while writing to socket. An existing connection was forcibly closed by the remote host

Hi,

I don’t understand where this error is coming from and how to debug it:

ConnectionError: Error 10054 while writing to the socket. An existing connection was forcibly closed by the remote host.

I have a W10 machine. Below is my code. What am I missing?

class RayTune:
    def __init__(self, path):
        self.data = Wrangler(path)
        self.default = {
            'train_dataset': self.data.train_dataset,
            'test_dataset': self.data.test_dataset,
            'test_dataframe': self.data.test_dataframe,
            'user_pool': len(self.data.user_pool),
            'item_pool': len(self.data.item_pool),
            'latent_dim': 8,
            'lr': 3e-4,
            'wd': 1e-7,
            'bs': 256,
            'epochs': 35,
            'cuda': True,
            'comment': '_ray_tune'
        }

    def train_gmf_tune(self, config, checkpoint_dir=None):
        self.update_default(config)
        engine = Experiment(self.default)
        engine.fit(ray_tune=True)

    def update_default(self, config):
        for key, val in config.items():
            self.default[key] = val

    def main(self, num_samples=10, max_epochs=10, gpus=1):
        config = {
            'lr': tune.loguniform(1e-5, 1e-1),
            'wd': tune.loguniform(1.5e-7, 1e-7),
            'bs': tune.choice([32, 64, 128, 256, 512]),
            'latent_dim': tune.choice([4, 6, 8, 10, 12, 14, 16, 18, 20])
        }
        reporter = CLIReporter(metric_columns=['loss', 'hr'])
        scheduler = ASHAScheduler(max_t=max_epochs, grace_period=1, reduction_factor=2)
        result = tune.run(tune.with_parameters(self.train_gmf_tune),
                          resources_per_trial={"cpu": 2, "gpu": gpus},
                          config=config,
                          metric="hr",
                          mode="max",
                          num_samples=num_samples,
                          scheduler=scheduler,
                          progress_reporter=reporter)
        return result.get_best_trial('hr', 'max', 'last').config

The full error message:

ConnectionResetError Traceback (most recent call last)
d:\anaconda3\envs\api4\lib\site-packages\redis\connection.py in send_packed_command(self, command, check_health)
699 for item in command:
–> 700 sendall(self._sock, item)
701 except socket.timeout:

d:\anaconda3\envs\api4\lib\site-packages\redis_compat.py in sendall(sock, *args, **kwargs)
7 def sendall(sock, *args, **kwargs):
----> 8 return sock.sendall(*args, **kwargs)
9

ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

ConnectionError Traceback (most recent call last)
in
1 import ray
2
----> 3 tuner.main()

D:\workspace\imo-api4\latest\engine.py in main(self, num_samples, max_epochs, gpus)
139 ‘bs’: tune.choice([32, 64, 128, 256, 512]),
140 ‘latent_dim’: tune.choice([4, 6, 8, 10, 12, 14, 16, 18, 20])
–> 141 }
142 reporter = CLIReporter(metric_columns=[‘loss’, ‘hr’])
143 scheduler = ASHAScheduler(max_t=max_epochs, grace_period=1, reduction_factor=2)

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\tune.py in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, loggers, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, queue_trials, reuse_actors, trial_executor, raise_on_failed_trial, callbacks, ray_auto_init, run_errored_only, global_checkpoint_period, with_server, upload_dir, sync_to_cloud, sync_to_driver, sync_on_checkpoint)
297 for i, exp in enumerate(experiments):
298 if not isinstance(exp, Experiment):
–> 299 experiments[i] = Experiment(
300 name=name,
301 run=exp,

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\experiment.py in init (self, name, run, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, upload_dir, trial_name_creator, trial_dirname_creator, loggers, log_to_file, sync_to_driver, checkpoint_freq, checkpoint_at_end, sync_on_checkpoint, keep_checkpoints_num, checkpoint_score_attr, export_formats, max_failures, restore)
136 "checkpointable function. You can specify checkpoints "
137 “within your trainable function.”)
–> 138 self._run_identifier = Experiment.register_if_needed(run)
139 self.name = name or self._run_identifier
140

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\experiment.py in register_if_needed(cls, run_object)
274 “No name detected on trainable. Using {}.”.format(name))
275 try:
–> 276 register_trainable(name, run_object)
277 except (TypeError, PicklingError) as e:
278 msg = (

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\registry.py in register_trainable(name, trainable, warn)
69 raise TypeError(“Second argument must be convertable to Trainable”,
70 trainable)
—> 71 _global_registry.register(TRAINABLE_CLASS, name, trainable)
72
73

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\registry.py in register(self, category, key, value)
122 self._to_flush[(category, key)] = pickle.dumps(value)
123 if _internal_kv_initialized():
–> 124 self.flush_values()
125
126 def contains(self, category, key):

d:\anaconda3\envs\api4\lib\site-packages\ray\tune\registry.py in flush_values(self)
144 def flush_values(self):
145 for (category, key), value in self._to_flush.items():
–> 146 _internal_kv_put(_make_key(category, key), value, overwrite=True)
147 self._to_flush.clear()
148

d:\anaconda3\envs\api4\lib\site-packages\ray\experimental\internal_kv.py in _internal_kv_put(key, value, overwrite)
25
26 if overwrite:
—> 27 updated = worker.redis_client.hset(key, “value”, value)
28 else:
29 updated = worker.redis_client.hsetnx(key, “value”, value)

d:\anaconda3\envs\api4\lib\site-packages\redis\client.py in hset(self, name, key, value)
3002 Returns 1 if HSET created a new field, otherwise 0
3003 “”"
-> 3004 return self.execute_command(‘HSET’, name, key, value)
3005
3006 def hsetnx(self, name, key, value):

d:\anaconda3\envs\api4\lib\site-packages\redis\client.py in execute_command(self, *args, **options)
875 conn = self.connection or pool.get_connection(command_name, **options)
876 try:
–> 877 conn.send_command(*args)
878 return self.parse_response(conn, command_name, **options)
879 except (ConnectionError, TimeoutError) as e:

d:\anaconda3\envs\api4\lib\site-packages\redis\connection.py in send_command(self, *args, **kwargs)
718 def send_command(self, *args, **kwargs):
719 “Pack and send a command to the Redis server”
–> 720 self.send_packed_command(self.pack_command(*args),
721 check_health=kwargs.get(‘check_health’, True))
722

d:\anaconda3\envs\api4\lib\site-packages\redis\connection.py in send_packed_command(self, command, check_health)
710 errno = e.args[0]
711 errmsg = e.args[1]
–> 712 raise ConnectionError(“Error %s while writing to socket. %s.” %
713 (errno, errmsg))
714 except: # noqa: E722

ConnectionError: Error 10054 while writing to socket. An existing connection was forcibly closed by the remote host.

Thanks a bunch for posting here, @leoentersthevoid!

The main issue is that self is captured in the scope of Ray Tune, which references a large dataset. Ray Tune will serialize the scope of this function to ship it to different processes, and a scope that is too big in size can cause Ray to fail.

Instead, you can consider making train_gmf_tune a class method, and use config to pass in the default parameters.

@classmethod
def train_gmf_tune(cls, config, checkpoint_dir=None):
    defaults = config["defaults"]
    for key, val in config.items():
        defaults[key] = val
    engine = ...
    engine.fit(ray_tune=True)

Notice how self is not captured/referenced in the scope of the function.

1 Like

The @classmethod trick worked smoothly, thank you!

An off-topic curiosity:

If my default_num_epochs > max_t and considering that my actual training is happening inside engine.fit, does the training get adjusted based on the max_t param?

Yep, training may stop at max_t or before it.

I think I am deviating from the beaten track with my approach. I managed to make it run with your suggestion, but I still get a lot of errors. The main reported error in the logdir ruy_results by the traceback was this

AttributeError: Can’t get attribute ‘uiDataset’ on <module ‘data’ (namespace)>

This is the text from the file

2020-12-04 00:02:02,576	INFO services.py:1090 -- View the Ray dashboard at http://127.0.0.1:8265
2020-12-04 00:49:14,599	ERROR syncer.py:63 -- Log sync requires rsync to be installed.
2020-12-04 00:49:16,769	WARNING util.py:137 -- The `start_trial` operation took 1312.3147583007812 seconds to complete, which may be a performance bottleneck.
2020-12-04 00:49:21,442	WARNING worker.py:1091 -- Failed to unpickle actor class 'ImplicitFunc' for actor ID df5a1a8201000000. Traceback:
Traceback (most recent call last):
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\function_manager.py", line 493, in _load_actor_class_from_gcs
    actor_class = pickle.loads(pickled_class)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\__init__.py", line 2, in <module>
    from ray.tune.tune import run_experiments, run
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\tune.py", line 7, in <module>
    from ray.tune.analysis import ExperimentAnalysis
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\analysis\__init__.py", line 1, in <module>
    from ray.tune.analysis.experiment_analysis import ExperimentAnalysis, Analysis
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\analysis\experiment_analysis.py", line 19, in <module>
    from ray.tune.trial import Trial
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\trial.py", line 15, in <module>
    from ray.tune.durable_trainable import DurableTrainable
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\durable_trainable.py", line 4, in <module>
    from ray.tune.trainable import Trainable, TrainableUtil
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\trainable.py", line 23, in <module>
    from ray.tune.logger import UnifiedLogger
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\logger.py", line 15, in <module>
    from ray.tune.syncer import get_node_syncer
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\syncer.py", line 81, in <module>
    class SyncConfig:
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 958, in dataclass
    return wrap(_cls)
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 950, in wrap
    return _process_class(cls, init, repr, eq, order, unsafe_hash, frozen)
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 800, in _process_class
    cls_fields = [_get_field(cls, name, type)
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 800, in <listcomp>
    cls_fields = [_get_field(cls, name, type)
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 659, in _get_field
    if (_is_classvar(a_type, typing)
  File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 550, in _is_classvar
    return type(a_type) is typing._ClassVar
AttributeError: module 'typing' has no attribute '_ClassVar'

(pid=14068) 2020-12-04 00:49:21,053	ERROR function_manager.py:495 -- Failed to load actor class ImplicitFunc.
(pid=14068) Traceback (most recent call last):
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\function_manager.py", line 493, in _load_actor_class_from_gcs
(pid=14068)     actor_class = pickle.loads(pickled_class)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\__init__.py", line 2, in <module>
(pid=14068)     from ray.tune.tune import run_experiments, run
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\tune.py", line 7, in <module>
(pid=14068)     from ray.tune.analysis import ExperimentAnalysis
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\analysis\__init__.py", line 1, in <module>
(pid=14068)     from ray.tune.analysis.experiment_analysis import ExperimentAnalysis, Analysis
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\analysis\experiment_analysis.py", line 19, in <module>
(pid=14068)     from ray.tune.trial import Trial
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\trial.py", line 15, in <module>
(pid=14068)     from ray.tune.durable_trainable import DurableTrainable
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\durable_trainable.py", line 4, in <module>
(pid=14068)     from ray.tune.trainable import Trainable, TrainableUtil
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\trainable.py", line 23, in <module>
(pid=14068)     from ray.tune.logger import UnifiedLogger
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\logger.py", line 15, in <module>
(pid=14068)     from ray.tune.syncer import get_node_syncer
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\ray\tune\syncer.py", line 81, in <module>
(pid=14068)     class SyncConfig:
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 958, in dataclass
(pid=14068)     return wrap(_cls)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 950, in wrap
(pid=14068)     return _process_class(cls, init, repr, eq, order, unsafe_hash, frozen)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 800, in _process_class
(pid=14068)     cls_fields = [_get_field(cls, name, type)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 800, in <listcomp>
(pid=14068)     cls_fields = [_get_field(cls, name, type)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 659, in _get_field
(pid=14068)     if (_is_classvar(a_type, typing)
(pid=14068)   File "d:\anaconda3\envs\api4\lib\site-packages\dataclasses.py", line 550, in _is_classvar
(pid=14068)     return type(a_type) is typing._ClassVar
(pid=14068) AttributeError: module 'typing' has no attribute '_ClassVar'
2020-12-04 00:49:26,922	ERROR worker.py:1037 -- Possible unhandled error from worker: ray::ImplicitFunc.__init__() (pid=14068, ip=192.168.1.101)
  File "python\ray\_raylet.pyx", line 446, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 463, in ray._raylet.execute_task
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\worker.py", line 291, in deserialize_objects
    return context.deserialize_objects(data_metadata_pairs, object_refs)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 308, in deserialize_objects
    self._deserialize_object(data, metadata, object_ref))
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 247, in _deserialize_object
    return self._deserialize_msgpack_data(data, metadata)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 226, in _deserialize_msgpack_data
    python_objects = self._deserialize_pickle5_data(pickle5_data)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 214, in _deserialize_pickle5_data
    obj = pickle.loads(in_band, buffers=buffers)
AttributeError: Can't get attribute 'uiDataset' on <module 'data' (namespace)>
2020-12-04 00:49:27,009	ERROR worker.py:1037 -- Possible unhandled error from worker: ray::ImplicitFunc.train() (pid=14068, ip=192.168.1.101)
  File "python\ray\_raylet.pyx", line 443, in ray._raylet.execute_task
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\worker.py", line 186, in reraise_actor_init_error
    raise self.actor_init_error
  File "python\ray\_raylet.pyx", line 446, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 463, in ray._raylet.execute_task
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\worker.py", line 291, in deserialize_objects
    return context.deserialize_objects(data_metadata_pairs, object_refs)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 308, in deserialize_objects
    self._deserialize_object(data, metadata, object_ref))
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 247, in _deserialize_object
    return self._deserialize_msgpack_data(data, metadata)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 226, in _deserialize_msgpack_data
    python_objects = self._deserialize_pickle5_data(pickle5_data)
  File "d:\anaconda3\envs\api4\lib\site-packages\ray\serialization.py", line 214, in _deserialize_pickle5_data
    obj = pickle.loads(in_band, buffers=buffers)
AttributeError: Can't get attribute 'uiDataset' on <module 'data' (namespace)>
2020-12-04 00:49:37,332	WARNING util.py:137 -- The `experiment_checkpoint` operation took 20.563968420028687 seconds to complete, which may be a performance bottleneck.
== Status ==
Memory usage on this node: 6.4/15.9 GiB
Using AsyncHyperBand: num_stopped=0
Bracket: Iter 8.000: None | Iter 4.000: None | Iter 2.000: None | Iter 1.000: None
Resources requested: 2/12 CPUs, 1/1 GPUs, 0.0/4.3 GiB heap, 0.0/1.46 GiB objects
Result logdir: C:\Users\cleov\ray_results\inner_2020-12-04_00-02-12
Number of trials: 1/10 (1 RUNNING)
+-------------------+----------+-------+------+--------------+------------+-------------+
| Trial name        | status   | loc   |   bs |   latent_dim |         lr |          wd |
|-------------------+----------+-------+------+--------------+------------+-------------|
| inner_32347_00000 | RUNNING  |       |  512 |           14 | 0.00282398 | 1.25767e-07 |
+-------------------+----------+-------+------+--------------+------------+-------------+

I consider rewriting the whole script to mimic the approach suggested by your official tutorial.

Do you have some sort of typing enabled?

Can you try pip uninstall dataclasses?