Thanks for your response @bveeramani. I was using AWS EC2.  After digging more with my code I found that the potential issue is probably about sync_config. When I set the S3 for the upload_dir I can not see any checkpoints in both S3 bucket and ray_results folder on ray cluster head node. While if I don’t set the upload_dir or comment out the sync_config then I have all my checkpoints in ray_results folder on ray cluster. Any suggestion how to have the checkpoints in S3?
Here is my updated code and you can change the sync_config to test it:
import math
import argparse
import torch
from filelock import FileLock
from torch.nn import functional as F
from torchmetrics import Accuracy
import pytorch_lightning as pl
from pl_bolts.datamodules.mnist_datamodule import MNISTDataModule
import os
from ray.tune.integration.pytorch_lightning import TuneReportCallback
from ray import tune, air
from datetime import datetime
class LightningMNISTClassifier(pl.LightningModule):
    def __init__(self, config, data_dir=None):
        super(LightningMNISTClassifier, self).__init__()
        self.data_dir = data_dir or os.getcwd()
        self.lr = config["lr"]
        layer_1, layer_2 = config["layer_1"], config["layer_2"]
        self.batch_size = config["batch_size"]
        # mnist images are (1, 28, 28) (channels, width, height)
        self.layer_1 = torch.nn.Linear(28 * 28, layer_1)
        self.layer_2 = torch.nn.Linear(layer_1, layer_2)
        self.layer_3 = torch.nn.Linear(layer_2, 10)
        # self.accuracy = Accuracy(task="multiclass", num_classes=10)
        self.accuracy = Accuracy()
    def forward(self, x):
        batch_size, channels, width, height = x.size()
        x = x.view(batch_size, -1)
        x = self.layer_1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        x = torch.relu(x)
        x = self.layer_3(x)
        x = torch.log_softmax(x, dim=1)
        return x
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)
    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        self.log("ptl/train_loss", loss)
        self.log("ptl/train_accuracy", acc)
        return loss
    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        return {"val_loss": loss, "val_accuracy": acc}
    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        avg_acc = torch.stack([x["val_accuracy"] for x in outputs]).mean()
        self.log("ptl/val_loss", avg_loss)
        self.log("ptl/val_accuracy", avg_acc)
def train_lm_tune(config, num_epochs=10, num_gpus=0):
    data_dir = os.path.abspath("./data")
    model = LightningMNISTClassifier(config, data_dir)
    with FileLock(os.path.expanduser("~/.data.lock")):
        dm = MNISTDataModule(
            data_dir=data_dir, num_workers=1, batch_size=config["batch_size"]
        )
    metrics = {"loss": "ptl/val_loss", "acc": "ptl/val_accuracy"}
    trainer = pl.Trainer(
        max_epochs=num_epochs,
        # If fractional GPUs passed in, convert to int.
        gpus=math.ceil(num_gpus),
        enable_progress_bar=False,
        callbacks=[TuneReportCallback(metrics, on="validation_end")],
    )
    trainer.fit(model, dm)
def tune_mnist(
    num_samples=10,
    num_epochs=10,
    gpus_per_trial=0,
):
    config = {
        "layer_1": tune.choice([32]),
        "layer_2": tune.choice([64]),
        "lr": tune.choice([1e-4]),
        "batch_size": tune.choice([32]),
    }
    trainable = tune.with_parameters(
        train_lm_tune, num_epochs=num_epochs, num_gpus=gpus_per_trial
    )
    folder_name = f"ray_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
    tuner = tune.Tuner(
        tune.with_resources(trainable, resources={"cpu": 1, "gpu": gpus_per_trial}),
        tune_config=tune.TuneConfig(
            metric="loss",
            mode="min",
            num_samples=num_samples,
        ),
        run_config=air.RunConfig(
            name=folder_name,
            # sync_config=tune.SyncConfig(
            #     upload_dir="s3://PUTYOURS3ADDRESSHERE" # <--- UPDATE WITH YOUR S3
            # ),
        ),
        param_space=config,
    )
    results = tuner.fit()
    print("Best hyperparameters found were: ", results.get_best_result().config)
    print("Best model found were: ", results.get_best_result().log_dir)
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing"
    )
    parser.add_argument(
        "--server-address",
        type=str,
        default=None,
        required=False,
        help="The address of server to connect to if using Ray Client.",
    )
    args, _ = parser.parse_known_args()
    if args.smoke_test:
        tune_mnist(num_samples=1, num_epochs=1, gpus_per_trial=0)
    else:
        if args.server_address:
            import ray
            ray.init(f"ray://{args.server_address}")
        tune_mnist(num_samples=1, num_epochs=1, gpus_per_trial=1)
Here is the config.yaml:
# An unique identifier for the head node and workers of this cluster.
cluster_name: default
# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 2
min_workers: 1
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
    image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-cpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options:   # Extra options to pass into "docker run"
        - --ulimit nofile=65536:65536
    # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"
    # Allow Ray to automatically detect GPUs
    # worker_image: "rayproject/ray-ml:latest-cpu"
    # worker_run_options: []
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    # Availability zone(s), comma-separated, that nodes may be launched in.
    # Nodes will be launched in the first listed availability zone and will
    # be tried in the subsequent availability zones if launching fails.
    availability_zone: us-west-2a,us-west-2b, us-west-2c
    # Whether to allow node reuse. If set to False, nodes will be terminated
    # instead of stopped.
    cache_stopped_nodes: True # If not present, the default is True.
    security_group:
        GroupName: ray_client_security_group
        IpPermissions:
              - FromPort: 10001
                ToPort: 10001
                IpProtocol: TCP
                IpRanges:
                    # This will enable inbound access from ALL IPv4 addresses.
                    - CidrIp: 0.0.0.0/0
# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
#    ssh_private_key: /path/to/your/key.pem
# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large
            # ImageId: ami-02ee8d5b35a4a74b2 # ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
            # You can provision additional disk space with a conf as follows
            BlockDeviceMappings:
                - DeviceName: /dev/sda1
                  Ebs:
                      VolumeSize: 1000
            # Additional options in the boto docs.
    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 1
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 2
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: g5.xlarge #  m5.large
            # ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
            # Run workers on spot by default. Comment this out to use on-demand.
            # NOTE: If relying on spot instances, it is best to specify multiple different instance
            # types to avoid interruption when one instance type is experiencing heightened demand.
            # Demand information can be found at https://aws.amazon.com/ec2/spot/instance-advisor/
            InstanceMarketOptions:
                MarketType: spot
                # Additional options can be found in the boto docs, e.g.
                #   SpotOptions:
                #       MaxPrice: MAX_HOURLY_PRICE
            # Additional options in the boto docs.
# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
# List of shell commands to run to set up nodes.
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
Note: For ARN in yaml I gave AmazonS3FullAccess, AmazonEC2FullAccess, AmazonEC2ContainerRegistryFullAccess.
The ray_results folder without upload_dir:
drwxr-xr-x 3 ray users 4.0K Jan  4 22:26 data
-rw-r--r-- 1 ray users 1.1K Jan  4 22:27 events.out.tfevents.1672900009.ip-172-31-31-103
drwxr-xr-x 3 ray users 4.0K Jan  4 22:27 lightning_logs
-rw-r--r-- 1 ray users   72 Jan  4 22:26 params.json
-rw-r--r-- 1 ray users   69 Jan  4 22:26 params.pkl
-rw-r--r-- 1 ray users  476 Jan  4 22:27 progress.csv
-rw-r--r-- 1 ray users  643 Jan  4 22:27 result.json
The ray_results folder with upload_dir:
-rw-r--r-- 1 ray users 1.1K Jan  4 22:22 events.out.tfevents.1672899750.ip-172-31-31-103
-rw-r--r-- 1 ray users   72 Jan  4 22:22 params.json
-rw-r--r-- 1 ray users   69 Jan  4 22:22 params.pkl
-rw-r--r-- 1 ray users  475 Jan  4 22:22 progress.csv
-rw-r--r-- 1 ray users  642 Jan  4 22:22 result.json