Stochastic error in managing actors in RayTune

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I occasionally get this error and it has been very hard to reproduce. Using python3.9 and ray v2.22.0. Following previous posts, I made sure that pickle5 is not installed.

  File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 388, in _invocation_actor_class_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/actor.py", line 1041, in _remote
    worker.function_actor_manager.export_actor_class(
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/function_manager.py", line 482, in export_actor_class
    serialized_actor_class = pickle_dumps(
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/serialization.py", line 66, in pickle_dumps
    return pickle.dumps(obj)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
_pickle.PicklingError: Can't pickle <built-in function print>: it's not the same object as builtins.print

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/userdata/sjain/projects/fomo-models/tubelet_autoencoder/model_v3.py", line 722, in <module>
    result_grid = tuner.fit()
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/tuner.py", line 377, in fit
    return self._local_tuner.fit()
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/impl/tuner_internal.py", line 476, in fit
    analysis = self._fit_internal(trainable, param_space)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/impl/tuner_internal.py", line 595, in _fit_internal
    analysis = run(
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/tune.py", line 999, in run
    runner.cleanup()
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 1975, in cleanup
    self._cleanup_trials()
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 792, in _cleanup_trials
    self._schedule_trial_stop(trial)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 1403, in _schedule_trial_stop
    self._remove_actor(tracked_actor=tracked_actor)
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 811, in _remove_actor
    stop_future = self._actor_manager.schedule_actor_task(
  File "<path_to_my_env>/lib/python3.9/site-packages/ray/air/execution/_internal/actor_manager.py", line 726, in schedule_actor_task
    raise ValueError(
ValueError: Tracked actor is not managed by this event manager: <TrackedActor 105686241699737627712142917511253406370>

My full script:

'''
Initial attempt to model with RayTune.
'''

import os, sys
import numpy as np
import csv
import h5py
import pickle
from collections import defaultdict, OrderedDict
import tqdm as tqdm
import random
import argparse
import yaml
import glob
from zipfile import BadZipFile
import tempfile
from filelock import FileLock

from functools import partial
print = partial(print, flush=True)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data.dataloader import default_collate

import ray
from ray import train, tune
from ray.tune.schedulers import ASHAScheduler
from ray.train import Checkpoint, RunConfig, Result
from hyperopt import hp
from ray.tune import Callback
from ray.tune.search.hyperopt import HyperOptSearch

PROJ_DIR = '<proj_path>'
with open(f'{PROJ_DIR}/config/<redacted>.yaml') as f:
    PROJ_CONFIG = yaml.safe_load(f)

sys.path.insert(0, PROJ_DIR+'/data_utils')
import data_loader


POS_COORD_LIM = 1024

zscore = lambda x: (x - x.mean(0))/x.std(0)

########################
# DATA LOADING FUNCTIONS
########################

def get_task_participants(task, task_csv_path):
    # Redacted
    return np.unique(task_parts)

def _load_block_data(data_path, subj_emb_list, subj, block):
    try:
        # Redacted
        return data, coords, subj_id

    except OSError:
        if verbose: print(f'Data was None because of OSError!')
        return None
    
def collate_fn(batch):
    batch = list(filter(lambda x: x is not None, batch))
    if verbose: print('Collater collating!')
    return default_collate(batch)

class Dataset(Dataset):
    def __init__(self, data_path, subj_emb_list, data_combos):
        self.data_path = data_path
        self.subj_emb_list = subj_emb_list
        self.data_combos = data_combos

    def __getitem__(self, idx):
        return _load_block_data(
            self.data_path, self.subj_emb_list, *self.data_combos[idx])

    def __len__(self):
        return len(self.data_combos)
    
##################
# MODEL DEFINITION
##################

class PoolPosition(nn.Module):
    def __init__(self, grid_conv_size):
        super().__init__()
        self.pool = nn.AvgPool2d((grid_conv_size, grid_conv_size), stride=(grid_conv_size, grid_conv_size))

    def forward(self, x):
        x = torch.add(self.pool(x), POS_COORD_LIM).int()
        return x

class TubeEnc(nn.Module):
    def __init__(self, temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size):
        super().__init__()
        self.temp_conv_size = temp_conv_size
        self.grid_conv_size = grid_conv_size
        self.latent_dim1 = latent_dim1
        self.latent_dim2 = latent_dim2
        self.subj_emb_size = subj_emb_size
        
        self.conv1 = nn.Conv3d(
            1, latent_dim1, (grid_conv_size, grid_conv_size, temp_conv_size), 
            stride=(grid_conv_size, grid_conv_size, temp_conv_size))
        self.fc1 = nn.Linear(latent_dim1, latent_dim2)
        self.convtrans1 = nn.ConvTranspose3d(
            latent_dim2, 1, (grid_conv_size, grid_conv_size, temp_conv_size), 
            stride=(grid_conv_size, grid_conv_size, temp_conv_size))
        self.subj_embedding = nn.Embedding(400, subj_emb_size)
        nn.init.zeros_(self.subj_embedding.weight)

    def forward(self, img, space_pos, subj_ixs, verbose=False):
        if verbose: print('Input:', img.shape)
        
        img_conv = F.relu(self.conv1(img))
        if verbose: print('Tubelet Conv:', img_conv.shape)
        
        subj_emb = self.subj_embedding(subj_ixs).T
        space_pos = space_pos.T
        subj_emb = subj_emb.expand(space_pos.shape[0], space_pos.shape[1], -1, -1)
        aux_emb = torch.cat([subj_emb, space_pos], axis=2)
        if verbose: print('Aux. emb:', aux_emb.shape)
        
        img_space_pos = (img_conv.T + aux_emb).T
        batch_size, _, nheight_chans, nwidth_chans, ntimepoints = img_space_pos.shape
        if verbose: print('Space pos & subj embedding!')
        
        img_linear = self.fc1(torch.swapaxes(torch.flatten(img_space_pos, start_dim=2), 1, 2))
        if verbose: print('Linear reduction:', img_linear.shape)

        img_trans = img_linear
        img_trans = torch.reshape(torch.swapaxes(img_trans, 1, 2), 
                                  (batch_size, self.latent_dim2, nheight_chans, nwidth_chans, ntimepoints))
        if verbose: print('Reshaping:', img_trans.shape)
        
        img_deconv = self.convtrans1(img_trans)
        if verbose: print('Tubelet Deconv:', img_deconv.shape)
        
        img_op = img_deconv
        
        return img_op

def get_model_output(model_list, criterion, data_list, device):
    
    tube_autoenc, pool_pos, pos_emb = model_list
    img_data, mni_coords, subj_ids = data_list

    D = pos_emb.shape[2]
    pos_locs = pool_pos(mni_coords).numpy()
    pos_embeddings = np.zeros([pos_locs.shape[0], D, pos_locs.shape[2], pos_locs.shape[3]])
    for bs in range(pos_locs.shape[0]):
        for i in range(pos_locs.shape[2]):
            for j in range(pos_locs.shape[3]):
                pos_embeddings[bs, :, i, j] = pos_emb[pos_locs[bs, 1, i, j], pos_locs[bs, 2, i, j]]
    pos_embeddings = torch.tensor(pos_embeddings.astype(np.float32)).to(device)

    output = tube_autoenc(img_data.cuda(), pos_embeddings.cuda(), subj_ids.cuda())
    loss = criterion(output, img_data.to(device))
    return loss

############################
# MODEL TRAINING AND TESTING
############################

# Make this a generic function that takes in the model(s), dataloader, criterion, optimizer and scheduler(s), and returns the cumulative loss.
def train_loop(model_list, dataloader, criterion, optimizer, scheduler_list, device, train_count):
    
    tube_autoenc, pool_pos, pos_emb = model_list
    tube_autoenc.train()
    try:
        cum_loss = []
        for data_list in dataloader:
            optimizer.zero_grad()

            train_loss = get_model_output(model_list, criterion, data_list, device)
            cum_loss.append(train_loss.item())

            train_loss.backward()
            optimizer.step()

            if len(cum_loss)>=train_count: break

    except StopIteration:
        pass

    for scheduler in scheduler_list:
        scheduler.step()
    
    cum_loss = np.mean(cum_loss)    
    return cum_loss

    
def test_loop(model_list, dataloader, criterion, device, is_val=False, val_count=np.inf):
    # # Runs it in test-mode. This ensures that the data will be read in order
    # # and iterator will start from the beginning each time.
    # generator = enumerate(dataloader)
    tube_autoenc, pool_pos, pos_emb = model_list
    tube_autoenc.eval()
    with torch.no_grad():
        try:
            cum_loss = []
            for data_list in dataloader:
                val_loss = get_model_output(model_list, criterion, data_list, device)
                cum_loss.append(val_loss.item())

                if is_val: 
                    if len(cum_loss)>=val_count: break
                else:
                    if (len(cum_loss)-1)%5==0:
                        print(f'Batch {len(cum_loss)}: {np.mean(cum_loss).round(2)}')

        except StopIteration:
            pass
                
    cum_loss = np.mean(cum_loss)
    return cum_loss
    
def train(config):
    ###################
    # Initalize Models.
    ###################

    D = latent_dim1 - subj_emb_size
    # 8*16*65536 = 4*8*x*int(np.floor(65536/temp_conv_size))
    # <=4*65536/int(np.floor(65536/temp_conv_size))
    assert 4*30000/int(np.floor(30000/temp_conv_size))>=latent_dim1
    assert latent_dim1 > latent_dim2

    pos_emb = np.zeros([POS_COORD_LIM*2, POS_COORD_LIM*2, D])
    for x in range(10):
        for y in range(10):
            for i in range(int(D/4)):
                pos_emb[x, y, 2*i] = np.sin(x/10000**(4*i/D))
                pos_emb[x, y, 2*i+1] = np.cos(x/10000**(4*i/D))
            for j in range(int(D/4)):
                pos_emb[x, y, 2*j+int(D/2)] = np.sin(y/10000**(4*j/D))
                pos_emb[x, y, 2*j+1+int(D/2)] = np.cos(y/10000**(4*j/D))
    print(pos_emb.shape)

    pool_pos = PoolPosition(grid_conv_size)
    pool_pos.eval()

    tube_autoenc = TubeEnc(
        temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size)
    device = 'cpu'
    if torch.cuda.is_available():
        device = 'cuda:0'
        if torch.cuda.device_count() > 1:
            print('#'*10, '\nLet\'s use', torch.cuda.device_count(), 'GPUs!\n' + '#'*10)
            tube_autoenc = nn.DataParallel(tube_autoenc)
    print(device, torch.cuda.is_available())
    print('#'*10, tube_autoenc.to(device), '#'*10)

    #########################
    # Initalize data loaders.
    #########################

    # Redacted def of `train_dset`
    train_dataloader = DataLoader(train_dset, batch_size=int(config['batch_size']), shuffle=True, 
                                  pin_memory=True, drop_last=True, collate_fn=collate_fn)

    # Redacted def of `val_dset`
    val_dataloader = DataLoader(val_dset, batch_size=int(config['batch_size']), shuffle=True, 
                                pin_memory=True, drop_last=True, collate_fn=collate_fn)

    ################
    # MODELING, yay!
    ################

    criterion = nn.MSELoss()
    optimizer = optim.SGD(
        tube_autoenc.parameters(), lr=config['lr'], momentum=config['momentum'])
    schedulers = [
        optim.lr_scheduler.ExponentialLR(optimizer, gamma=config['gamma'])
    ]
    
    print('#'*10, 'DONE', '#'*10)
    
    ################
    # TRAINING, yay!
    ################
    epoch = 1
    val_loss = None
    
    save_args = [
        train_subjs, test_subjs, data_combos, subj_emb_list, temp_conv_size, 
        grid_conv_size, latent_dim1, latent_dim2, subj_emb_size
    ]

    while True:
        model_list = [tube_autoenc, pool_pos, pos_emb]
        train_loss = train_loop(
            model_list, train_dataloader, criterion, optimizer, schedulers, device, train_count=train_count)
        val_loss = test_loop(model_list, val_dataloader, criterion, device, is_val=True, val_count=val_count)

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            checkpoint = None
            if (epoch-1)%20==0:
                # This saves the model to the trial directory
                save_args_all = save_args + [
                    os.path.join(temp_checkpoint_dir, 'model.pt'), 
                    tube_autoenc, pool_pos, optimizer, epoch, train_loss, val_loss]
                save_model(*save_args_all)
                checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
            # Send the current training result back to Tune
            train.report({'mean_loss': val_loss}, checkpoint=checkpoint)

        epoch += 1

    return

def test(checkpoint, inf_batch_size):
    temp_conv_size = checkpoint['temp_conv_size']
    grid_conv_size = checkpoint['grid_conv_size']
    latent_dim1 = checkpoint['latent_dim1']
    latent_dim2 = checkpoint['latent_dim2']
    subj_emb_size = checkpoint['subj_emb_size']
    
    ###################
    # Initalize Models.
    ###################

    D = latent_dim1 - subj_emb_size
    # 8*16*65536 = 4*8*x*int(np.floor(65536/temp_conv_size))
    # <=4*65536/int(np.floor(65536/temp_conv_size))
    assert 4*30000/int(np.floor(30000/temp_conv_size))>=latent_dim1
    assert latent_dim1 > latent_dim2

    pos_emb = np.zeros([POS_COORD_LIM*2, POS_COORD_LIM*2, D])
    for x in range(10):
        for y in range(10):
            for i in range(int(D/4)):
                pos_emb[x, y, 2*i] = np.sin(x/10000**(4*i/D))
                pos_emb[x, y, 2*i+1] = np.cos(x/10000**(4*i/D))
            for j in range(int(D/4)):
                pos_emb[x, y, 2*j+int(D/2)] = np.sin(y/10000**(4*j/D))
                pos_emb[x, y, 2*j+1+int(D/2)] = np.cos(y/10000**(4*j/D))
    print(pos_emb.shape)

    pool_pos = PoolPosition(grid_conv_size)
    pool_pos.eval()

    tube_autoenc = TubeEnc(
        temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size)
    device = 'cpu'
    if torch.cuda.is_available():
        device = 'cuda:0'
        if torch.cuda.device_count() > 1:
            print('#'*10, '\nLet\'s use', torch.cuda.device_count(), 'GPUs!\n' + '#'*10)
            tube_autoenc = nn.DataParallel(tube_autoenc)
    print(device, torch.cuda.is_available())
    print('#'*10, tube_autoenc.to(device), '#'*10)

    #########################
    # Initalize data loaders.
    #########################

    # Redacted def of `val_dset`
    val_dataloader = DataLoader(val_dset, batch_size=inf_batch_size, shuffle=False, 
                                pin_memory=True, drop_last=False, collate_fn=collate_fn)
    
    # Redacted def of `test_dset`
    test_dataloader = DataLoader(test_dset, batch_size=inf_batch_size, shuffle=False, 
                                 pin_memory=True, drop_last=False, collate_fn=collate_fn)

    # Redacted def of `fshot_test_dset`
    fshot_test_dataloader = DataLoader(fshot_test_dset, batch_size=inf_batch_size, shuffle=False, 
                                 pin_memory=True, drop_last=False, collate_fn=collate_fn)

    ################
    # MODELING, yay!
    ################

    criterion = nn.MSELoss()
    
    ################
    # TESTING, yay!
    ################
    pool_pos.load_state_dict(checkpoint['posenc_state_dict'])
    try:
        tube_autoenc.load_state_dict(checkpoint['model_state_dict'])
    except RuntimeError:
        try:
            no_dataPar_state_dict = OrderedDict()
            for key, value in checkpoint['model_state_dict'].items():
                no_dataPar_state_dict[key[7:]] = value
            tube_autoenc.load_state_dict(no_dataPar_state_dict)
        except RuntimeError:
            dataPar_state_dict = OrderedDict()
            for key, value in checkpoint['model_state_dict'].items():
                dataPar_state_dict[f'module.{key}'] = value
            tube_autoenc.load_state_dict(dataPar_state_dict)
    
    model_list = [tube_autoenc, pool_pos, pos_emb]
    val_loss = test_loop(model_list, val_dataloader, criterion, device, is_val=False)
    test_loss = test_loop(model_list, test_dataloader, criterion, device, is_val=False)
    fshot_test_loss = test_loop(model_list, fshot_test_dataloader, criterion, device, is_val=False)

    return val_loss, test_loss, fshot_test_loss

####################
# MISC. HELPER FUNCS
####################

class MyCallback(Callback):
    def on_trial_result(self, iteration, trials, trial, result, **info):
        print(f'Got result: {result["mean_loss"]}')
    
def log_gradients_in_model(model, writer, epoch):
    for tag, value in model.named_parameters():
        if value.grad is not None:
            writer.add_histogram(tag + '/grad', value.grad.cpu(), epoch)
            
def save_model(train_subjs, test_subjs, data_combos, subj_emb_list, temp_conv_size, 
               grid_conv_size, latent_dim1, latent_dim2, subj_emb_size, file_name, 
               tube_autoenc, pool_pos, optimizer, epoch, train_loss, val_loss=None):

    saving_dict = {
        'train_subjs': train_subjs,
        'test_subjs': test_subjs,
        'data_combos': data_combos,
        'subj_emb_list': subj_emb_list,

        'temp_conv_size': temp_conv_size,
        'grid_conv_size': grid_conv_size,
        'latent_dim1': latent_dim1,
        'latent_dim2': latent_dim2,
        'subj_emb_size': subj_emb_size,

        'model_state_dict': tube_autoenc.state_dict(), 
        'posenc_state_dict': pool_pos.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,  
        'train_loss': train_loss,
    }
    if val_loss:
        saving_dict['val_loss'] = val_loss

    torch.save(saving_dict, os.path.join(save_path, file_name))
    print('Model saved!')
    return


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # parser.add_argument('--gpu_list', type=str, required=True,
                        # help='List of visible GPUs.')
    parser.add_argument('--port', type=int,
                        help='Ray port.')
    parser.add_argument('--load_exp', type=str,
                        help='Path to model that needs to be loaded.')
    parser.add_argument('--save_path', type=str,
                        help='Path for saving model.')
    parser.add_argument('--exp_name', type=str,
                        help='Experiment name for Ray dashboard.')
    parser.add_argument('-verbose', action='store_true', 
                        help='Boolean indicating if print statements should be executed.')
    parser.add_argument('-inference', action='store_true', 
                        help='Boolean indicating if model should be run in inference mode.')
    ##################
    # MODEL PARAMETERS
    ##################
    parser.add_argument('--task', type=str, required=True,
                        help='Task for which the model needs to be trained/tested.')
    parser.add_argument('--temp_conv_size', type=int,
                        help='Number of frames in tubelet (to convolve over).')
    parser.add_argument('--grid_conv_size', type=int,
                        help='Grid downsampling factor.')
    parser.add_argument('--latent_dim1', type=int,
                        help='Number of convolution filters.')
    parser.add_argument('--latent_dim2', type=int,
                        help='Low-dimension to compress to.')
    parser.add_argument('--subj_emb_size', type=int,
                        help='Dimensionality of subject embedding.')
    parser.add_argument('--train_subj_perc', default=0.8, type=float, 
                        help='Percentage of data to use as training/validation set.')
    parser.add_argument('--num_subjs', default=400, type=int, 
                        help='Max. number of data to load.')
    #######################
    # MODEL HYPERPARAMETERS
    #######################
    parser.add_argument('--seed', default=42, type=int)
    parser.add_argument('--inf_batch_size', default=64, type=int, 
                        help='Batch size for inference-mode.')
    parser.add_argument('--train_count', default=50, type=int, 
                        help='#batches to use for training per epoch.')
    parser.add_argument('--val_count', default=50, type=int, 
                        help='#batches to use for estimating validation loss per epoch.')
    args = parser.parse_args()
    globals().update(args.__dict__)

    os.environ['RAY_PICKLE_VERBOSE_DEBUG']='2'
    
    # os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
    # os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'

    ray.shutdown()

    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    # Pre-trained model exists.
    if load_exp is not None:
        storage_path = '~/ray_results'
        
        experiment_path = os.path.join(storage_path, load_exp)
        print(f'Loading results from {experiment_path}...')
        
        restored_tuner = tune.Tuner.restore(experiment_path, trainable=train)
        result_grid = restored_tuner.get_results()
    
        best_result: Result = result_grid.get_best_result(metric='mean_loss', mode='min')

        print(best_result.config)
        result_df = best_result.metrics_dataframe
        print(result_df[['training_iteration', 'mean_loss', 'time_total_s']])
        
        with best_result.checkpoint.as_directory() as checkpoint_dir:
            checkpoint = torch.load(os.path.join(checkpoint_dir, 'model.pt'))
        
    if inference:
        assert load_exp is not None, 'Please specify an experiment to load!'
        save_path = os.path.join(experiment_path, 'inference')
    
    assert save_path is not None, 'Please specify a path to save the model!'

    save_path = os.path.join(save_path, f'seed{seed}')
    if not os.path.exists(save_path):
        os.makedirs(save_path)
            
    scrap_path = 'scrap_data/common'
    if not os.path.exists(scrap_path):
        os.makedirs(scrap_path)

    # os.environ['TUNE_RESULT_DIR'] = save_path

    # Lots of redacted code on creating the train/val/test split and loading dataset.

    #############
    # Fit models.
    #############

    if inference:
        val_loss, test_loss, fshot_test_loss = test(checkpoint, 1)
    else:
        search_space = {
            'lr': tune.loguniform(1e-4, 1e-1),
            'momentum': tune.uniform(0.1, 0.9),
            'gamma': tune.uniform(0.5, 0.95),
            'batch_size': tune.choice([8, 16, 32, 64]),
        }
    
        ray.init(runtime_env={'py_modules': ['<proj path>/data_utils']}, 
                 dashboard_port=port, num_gpus=torch.cuda.device_count())
        
        hyperopt_search = HyperOptSearch(search_space, metric='mean_loss', mode='min')
        trainable_with_gpu = tune.with_resources(train, {'gpu': 1})
        tuner = tune.Tuner(
            trainable_with_gpu,
            tune_config=tune.TuneConfig(
                num_samples=1,
                search_alg=hyperopt_search,
                scheduler=ASHAScheduler(metric='mean_loss', mode='min'),
            ),
            run_config=RunConfig(
                name=exp_name, callbacks=[MyCallback()], storage_path=save_path, 
                stop={"training_iteration": 2}),
        )
        result_grid = tuner.fit()

        results_df = result_grid.get_dataframe()
        print(results_df)

I am new to ray so any help would be appreciated. Thank you.

Fixed.

  1. Upgraded Ray on python3.9 (I had multiple python versions)
  2. Removed print = partial(print, flush=True).