TL;DR: I am trying to set a cluster with preemptible workers on GCP but I have issues with the resume.
I am trying to understand the resume behavior by randomly shut down one of the workers with ray kill-random-node -y gcp_our.yaml --hard
and I have seen that, if I do not set a resume (LOCAL
or anything else) when a worker dies the experiment is restarted from the first iteration. When a resume is set to LOCAL, I get: Called resume when no checkpoint exists in local directory
.
My desired behavior would be that, when a worker dies, the experiment restarts from the current iteration and not from the beginning. How could I do that?
Here the code that I am using:
from __future__ import print_function
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest.ax import AxClient, AxSearch
from ray.tune.suggest import ConcurrencyLimiter
import torch
from configs import METRIC, MINIMIZE
## TRAINABLE
class TrainKMNIST(tune.Trainable):
def setup(self, config):
use_cuda = torch.cuda.is_available()
self.device = torch.device("cuda" if use_cuda else "cpu")
self.train_loader, self.test_loader = get_data_loaders()
self.model = ConvNet().to(self.device)
self.optimizer = optim.SGD(
self.model.parameters(),
lr=config.get("lr", 0.01),
momentum=config.get("momentum", 0.9))
def step(self):
train(
self.model, self.optimizer, self.train_loader, device=self.device)
acc = test(self.model, self.test_loader, self.device)
return {"mean_accuracy": acc}
def save_checkpoint(self, checkpoint_dir):
checkpoint_path = os.path.join(checkpoint_dir, "model.pth")
torch.save(self.model.state_dict(), checkpoint_path)
return checkpoint_path
def load_checkpoint(self, checkpoint_path):
self.model.load_state_dict(torch.load(checkpoint_path))
if __name__ == "__main__":
try:
ray.init(address="auto")
except:
ray.init()
# FOR EARLY STOPPING
# see https://docs.ray.io/en/master/tune/api_docs/schedulers.html to chose a scheduler
sched = AsyncHyperBandScheduler(
metric=METRIC, # criteria on which chose experiments
mode="min" if MINIMIZE else "max", # ..according to metric
reduction_factor=2, # survival rate for concurrent experiments
grace_period=8, # each experiments runs at least grace_period before to be stopped from scheduler
)
## NEW EXPERIMENTS PROPOSAL
# see https://docs.ray.io/en/master/tune/api_docs/suggestion.html to chose a search algorithm
# see https://ax.dev/tutorials/tune_cnn.html for parameters
parameters = [
{
"name": "lr",
"type": "range",
"bounds": [1e-6, 0.4],
"value_type": "float",
"log_scale": True,
},
{
"name": "momentum",
"type": "range",
"value_type": "float",
"bounds": [0.0, 1.0],
},
]
# see https://ax.dev/versions/latest/tutorials/gpei_hartmann_service.html for experiment setup
client = AxClient(enforce_sequential_optimization=False, verbose_logging=False)
client.create_experiment(
name="nome", parameters=parameters, objective_name=METRIC, minimize=MINIMIZE,
)
searc = AxSearch(ax_client=client)
# , metric=METRIC, mode="min" if MINIMIZE else "max")
searc = ConcurrencyLimiter(searc, max_concurrent=12)
# both of these should return
validate_save_restore(TrainKMNIST)
validate_save_restore(TrainKMNIST, use_object_store=True)
analysis = tune.run(
TrainKMNIST,
scheduler=sched,
search_alg=searc,
stop={"mean_accuracy": 1.1, "training_iteration": 500},
resources_per_trial={
"cpu": 2,
"gpu": int(ray.cluster_resources()["GPU"] / ray.cluster_resources()["GPU"]) / 2,
}, # EDIT
num_samples=20,
# metric="mean_accuracy",
# mode="max",
#name="ray_results",
#local_dir=HOME,
max_failures=10,
checkpoint_freq=3,
checkpoint_at_end=True,
resume=LOCAL
# config=parameters,
)
print(
"Best config is:",
analysis.get_best_config(metric=METRIC, mode="min" if MINIMIZE else "max"),
)
while the .yaml is:
# An unique identifier for the head node and workers of this cluster.
cluster_name: test
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 2
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker: {}
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: gcp
region: europe-west1
availability_zone: europe-west1-b
project_id: humanitas-rad-ai-20-00
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below. This requires that you have added the key into the
# project wide meta-data.
# ssh_private_key: /path/to/your/key.pem
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as subnets and ssh-keys.
# For more documentation on available fields, see:
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
head_node:
machineType: n1-standard-4
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 50
# See https://cloud.google.com/compute/docs/images for more images
sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-cu101-debian-10
# Additional options can be found in in the compute docs at
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
# If the network interface is specified as below in both head and worker
# nodes, the manual network config is used. Otherwise an existing subnet is
# used. To use a shared subnet, ask the subnet owner to grant permission
# for 'compute.subnetworks.use' to the ray autoscaler account...
# networkInterfaces:
# - kind: compute#networkInterface
# subnetwork: path/to/subnet
# aliasIpRanges: []
worker_nodes:
machineType: n1-standard-4
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 50
# See https://cloud.google.com/compute/docs/images for more images
sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-cu101-debian-10
guestAccelerators:
- acceleratorType: projects/humanitas-rad-ai-20-00/zones/europe-west1-b/acceleratorTypes/nvidia-tesla-k80
acceleratorCount: 1
metadata:
items:
- key: install-nvidia-driver
value: "True"
scheduling:
- onHostMaintenance: TERMINATE
- preemptible: true
# Additional options can be found in in the compute docs at
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
"./main.py": "./main.py",
"./configs.py": "./configs.py",
# "/path1/on/remote/machine": "/path1/on/local/machine",
}
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude: []
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter: []
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []
# List of shell commands to run to set up nodes.
setup_commands:
# Note: if you're developing Ray, you probably want to create an AMI that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc
# Install MiniConda.
- >-
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/anaconda3.sh
|| true
&& bash ~/anaconda3.sh -b -p ~/anaconda3 || true
&& rm ~/anaconda3.sh
&& echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.profile
# Install ray
- pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
- pip install torch==1.7.1+cu101 torchvision==0.8.2+cu101 torchaudio==0.7.2 -f https://download.pytorch.org/whl/torch_stable.html
- pip install pandas ray[tune] ax-platform sqlalchemy
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install google-api-python-client==1.7.8
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- >-
ulimit -n 65536;
ray start
--head
--port=6379
--object-manager-port=8076
--autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- >-
ulimit -n 65536;
ray start
--address=$RAY_HEAD_IP:6379
--object-manager-port=8076
Thank you in advance,
Riccardo