Thanks for your response @bveeramani. I was using AWS EC2. After digging more with my code I found that the potential issue is probably about sync_config. When I set the S3 for the upload_dir
I can not see any checkpoints in both S3 bucket and ray_results folder on ray cluster head node. While if I don’t set the upload_dir
or comment out the sync_config then I have all my checkpoints in ray_results folder on ray cluster. Any suggestion how to have the checkpoints in S3?
Here is my updated code and you can change the sync_config
to test it:
import math
import argparse
import torch
from filelock import FileLock
from torch.nn import functional as F
from torchmetrics import Accuracy
import pytorch_lightning as pl
from pl_bolts.datamodules.mnist_datamodule import MNISTDataModule
import os
from ray.tune.integration.pytorch_lightning import TuneReportCallback
from ray import tune, air
from datetime import datetime
class LightningMNISTClassifier(pl.LightningModule):
def __init__(self, config, data_dir=None):
super(LightningMNISTClassifier, self).__init__()
self.data_dir = data_dir or os.getcwd() = config["lr"]
layer_1, layer_2 = config["layer_1"], config["layer_2"]
self.batch_size = config["batch_size"]
# mnist images are (1, 28, 28) (channels, width, height)
self.layer_1 = torch.nn.Linear(28 * 28, layer_1)
self.layer_2 = torch.nn.Linear(layer_1, layer_2)
self.layer_3 = torch.nn.Linear(layer_2, 10)
# self.accuracy = Accuracy(task="multiclass", num_classes=10)
self.accuracy = Accuracy()
def forward(self, x):
batch_size, channels, width, height = x.size()
x = x.view(batch_size, -1)
x = self.layer_1(x)
x = torch.relu(x)
x = self.layer_2(x)
x = torch.relu(x)
x = self.layer_3(x)
x = torch.log_softmax(x, dim=1)
return x
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(),
def training_step(self, train_batch, batch_idx):
x, y = train_batch
logits = self.forward(x)
loss = F.nll_loss(logits, y)
acc = self.accuracy(logits, y)
self.log("ptl/train_loss", loss)
self.log("ptl/train_accuracy", acc)
return loss
def validation_step(self, val_batch, batch_idx):
x, y = val_batch
logits = self.forward(x)
loss = F.nll_loss(logits, y)
acc = self.accuracy(logits, y)
return {"val_loss": loss, "val_accuracy": acc}
def validation_epoch_end(self, outputs):
avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
avg_acc = torch.stack([x["val_accuracy"] for x in outputs]).mean()
self.log("ptl/val_loss", avg_loss)
self.log("ptl/val_accuracy", avg_acc)
def train_lm_tune(config, num_epochs=10, num_gpus=0):
data_dir = os.path.abspath("./data")
model = LightningMNISTClassifier(config, data_dir)
with FileLock(os.path.expanduser("~/.data.lock")):
dm = MNISTDataModule(
data_dir=data_dir, num_workers=1, batch_size=config["batch_size"]
metrics = {"loss": "ptl/val_loss", "acc": "ptl/val_accuracy"}
trainer = pl.Trainer(
# If fractional GPUs passed in, convert to int.
callbacks=[TuneReportCallback(metrics, on="validation_end")],
), dm)
def tune_mnist(
config = {
"layer_1": tune.choice([32]),
"layer_2": tune.choice([64]),
"lr": tune.choice([1e-4]),
"batch_size": tune.choice([32]),
trainable = tune.with_parameters(
train_lm_tune, num_epochs=num_epochs, num_gpus=gpus_per_trial
folder_name = f"ray_{'%Y-%m-%d-%H-%M-%S')}"
tuner = tune.Tuner(
tune.with_resources(trainable, resources={"cpu": 1, "gpu": gpus_per_trial}),
# sync_config=tune.SyncConfig(
# ),
results =
print("Best hyperparameters found were: ", results.get_best_result().config)
print("Best model found were: ", results.get_best_result().log_dir)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
"--smoke-test", action="store_true", help="Finish quickly for testing"
help="The address of server to connect to if using Ray Client.",
args, _ = parser.parse_known_args()
if args.smoke_test:
tune_mnist(num_samples=1, num_epochs=1, gpus_per_trial=0)
if args.server_address:
import ray
tune_mnist(num_samples=1, num_epochs=1, gpus_per_trial=1)
Here is the config.yaml:
# An unique identifier for the head node and workers of this cluster.
cluster_name: default
# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 2
min_workers: 1
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
# image: rayproject/ray:latest-cpu # use this one if you don't need ML dependencies, it's faster to pull
container_name: "ray_container"
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: # Extra options to pass into "docker run"
- --ulimit nofile=65536:65536
# Example of running a GPU head with CPU workers
# head_image: "rayproject/ray-ml:latest-gpu"
# Allow Ray to automatically detect GPUs
# worker_image: "rayproject/ray-ml:latest-cpu"
# worker_run_options: []
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
type: aws
region: us-west-2
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes will be launched in the first listed availability zone and will
# be tried in the subsequent availability zones if launching fails.
availability_zone: us-west-2a,us-west-2b, us-west-2c
# Whether to allow node reuse. If set to False, nodes will be terminated
# instead of stopped.
cache_stopped_nodes: True # If not present, the default is True.
GroupName: ray_client_security_group
- FromPort: 10001
ToPort: 10001
IpProtocol: TCP
# This will enable inbound access from ALL IPv4 addresses.
- CidrIp:
# How Ray will authenticate with newly launched nodes.
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem
# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
InstanceType: m5.large
# ImageId: ami-02ee8d5b35a4a74b2 # ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
- DeviceName: /dev/sda1
VolumeSize: 1000
# Additional options in the boto docs.
# The minimum number of worker nodes of this type to launch.
# This number should be >= 0.
min_workers: 1
# The maximum number of worker nodes of this type to launch.
# This takes precedence over min_workers.
max_workers: 2
# The node type's CPU and GPU resources are auto-detected based on AWS instance type.
# If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
# You can also set custom resources.
# For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
# resources: {"CPU": 1, "GPU": 1, "custom": 5}
resources: {}
# Provider-specific config for this node type, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
InstanceType: g5.xlarge # m5.large
# ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# Run workers on spot by default. Comment this out to use on-demand.
# NOTE: If relying on spot instances, it is best to specify multiple different instance
# types to avoid interruption when one instance type is experiencing heightened demand.
# Demand information can be found at
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# Additional options in the boto docs.
# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# Patterns for files to exclude when running rsync up or rsync down
- "**/.git"
- "**/.git/**"
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
- ".gitignore"
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
# List of shell commands to run to set up nodes.
setup_commands: []
# Note: if you're developing Ray, you probably want to create a Docker image that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
# that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
# - pip install -U "ray[default] @"
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
- ray stop
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
Note: For ARN in yaml I gave AmazonS3FullAccess, AmazonEC2FullAccess, AmazonEC2ContainerRegistryFullAccess.
The ray_results folder without upload_dir:
drwxr-xr-x 3 ray users 4.0K Jan 4 22:26 data
-rw-r--r-- 1 ray users 1.1K Jan 4 22:27 events.out.tfevents.1672900009.ip-172-31-31-103
drwxr-xr-x 3 ray users 4.0K Jan 4 22:27 lightning_logs
-rw-r--r-- 1 ray users 72 Jan 4 22:26 params.json
-rw-r--r-- 1 ray users 69 Jan 4 22:26 params.pkl
-rw-r--r-- 1 ray users 476 Jan 4 22:27 progress.csv
-rw-r--r-- 1 ray users 643 Jan 4 22:27 result.json
The ray_results folder with upload_dir:
-rw-r--r-- 1 ray users 1.1K Jan 4 22:22 events.out.tfevents.1672899750.ip-172-31-31-103
-rw-r--r-- 1 ray users 72 Jan 4 22:22 params.json
-rw-r--r-- 1 ray users 69 Jan 4 22:22 params.pkl
-rw-r--r-- 1 ray users 475 Jan 4 22:22 progress.csv
-rw-r--r-- 1 ray users 642 Jan 4 22:22 result.json