How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I occasionally get this error and it has been very hard to reproduce. Using python3.9
and ray v2.22.0
. Following previous posts, I made sure that pickle5
is not installed.
File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 388, in _invocation_actor_class_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/actor.py", line 1041, in _remote
worker.function_actor_manager.export_actor_class(
File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/function_manager.py", line 482, in export_actor_class
serialized_actor_class = pickle_dumps(
File "<path_to_my_env>/lib/python3.9/site-packages/ray/_private/serialization.py", line 66, in pickle_dumps
return pickle.dumps(obj)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
_pickle.PicklingError: Can't pickle <built-in function print>: it's not the same object as builtins.print
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/userdata/sjain/projects/fomo-models/tubelet_autoencoder/model_v3.py", line 722, in <module>
result_grid = tuner.fit()
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/tuner.py", line 377, in fit
return self._local_tuner.fit()
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/impl/tuner_internal.py", line 476, in fit
analysis = self._fit_internal(trainable, param_space)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/impl/tuner_internal.py", line 595, in _fit_internal
analysis = run(
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/tune.py", line 999, in run
runner.cleanup()
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 1975, in cleanup
self._cleanup_trials()
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 792, in _cleanup_trials
self._schedule_trial_stop(trial)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 1403, in _schedule_trial_stop
self._remove_actor(tracked_actor=tracked_actor)
File "<path_to_my_env>/lib/python3.9/site-packages/ray/tune/execution/tune_controller.py", line 811, in _remove_actor
stop_future = self._actor_manager.schedule_actor_task(
File "<path_to_my_env>/lib/python3.9/site-packages/ray/air/execution/_internal/actor_manager.py", line 726, in schedule_actor_task
raise ValueError(
ValueError: Tracked actor is not managed by this event manager: <TrackedActor 105686241699737627712142917511253406370>
My full script:
'''
Initial attempt to model with RayTune.
'''
import os, sys
import numpy as np
import csv
import h5py
import pickle
from collections import defaultdict, OrderedDict
import tqdm as tqdm
import random
import argparse
import yaml
import glob
from zipfile import BadZipFile
import tempfile
from filelock import FileLock
from functools import partial
print = partial(print, flush=True)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data.dataloader import default_collate
import ray
from ray import train, tune
from ray.tune.schedulers import ASHAScheduler
from ray.train import Checkpoint, RunConfig, Result
from hyperopt import hp
from ray.tune import Callback
from ray.tune.search.hyperopt import HyperOptSearch
PROJ_DIR = '<proj_path>'
with open(f'{PROJ_DIR}/config/<redacted>.yaml') as f:
PROJ_CONFIG = yaml.safe_load(f)
sys.path.insert(0, PROJ_DIR+'/data_utils')
import data_loader
POS_COORD_LIM = 1024
zscore = lambda x: (x - x.mean(0))/x.std(0)
########################
# DATA LOADING FUNCTIONS
########################
def get_task_participants(task, task_csv_path):
# Redacted
return np.unique(task_parts)
def _load_block_data(data_path, subj_emb_list, subj, block):
try:
# Redacted
return data, coords, subj_id
except OSError:
if verbose: print(f'Data was None because of OSError!')
return None
def collate_fn(batch):
batch = list(filter(lambda x: x is not None, batch))
if verbose: print('Collater collating!')
return default_collate(batch)
class Dataset(Dataset):
def __init__(self, data_path, subj_emb_list, data_combos):
self.data_path = data_path
self.subj_emb_list = subj_emb_list
self.data_combos = data_combos
def __getitem__(self, idx):
return _load_block_data(
self.data_path, self.subj_emb_list, *self.data_combos[idx])
def __len__(self):
return len(self.data_combos)
##################
# MODEL DEFINITION
##################
class PoolPosition(nn.Module):
def __init__(self, grid_conv_size):
super().__init__()
self.pool = nn.AvgPool2d((grid_conv_size, grid_conv_size), stride=(grid_conv_size, grid_conv_size))
def forward(self, x):
x = torch.add(self.pool(x), POS_COORD_LIM).int()
return x
class TubeEnc(nn.Module):
def __init__(self, temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size):
super().__init__()
self.temp_conv_size = temp_conv_size
self.grid_conv_size = grid_conv_size
self.latent_dim1 = latent_dim1
self.latent_dim2 = latent_dim2
self.subj_emb_size = subj_emb_size
self.conv1 = nn.Conv3d(
1, latent_dim1, (grid_conv_size, grid_conv_size, temp_conv_size),
stride=(grid_conv_size, grid_conv_size, temp_conv_size))
self.fc1 = nn.Linear(latent_dim1, latent_dim2)
self.convtrans1 = nn.ConvTranspose3d(
latent_dim2, 1, (grid_conv_size, grid_conv_size, temp_conv_size),
stride=(grid_conv_size, grid_conv_size, temp_conv_size))
self.subj_embedding = nn.Embedding(400, subj_emb_size)
nn.init.zeros_(self.subj_embedding.weight)
def forward(self, img, space_pos, subj_ixs, verbose=False):
if verbose: print('Input:', img.shape)
img_conv = F.relu(self.conv1(img))
if verbose: print('Tubelet Conv:', img_conv.shape)
subj_emb = self.subj_embedding(subj_ixs).T
space_pos = space_pos.T
subj_emb = subj_emb.expand(space_pos.shape[0], space_pos.shape[1], -1, -1)
aux_emb = torch.cat([subj_emb, space_pos], axis=2)
if verbose: print('Aux. emb:', aux_emb.shape)
img_space_pos = (img_conv.T + aux_emb).T
batch_size, _, nheight_chans, nwidth_chans, ntimepoints = img_space_pos.shape
if verbose: print('Space pos & subj embedding!')
img_linear = self.fc1(torch.swapaxes(torch.flatten(img_space_pos, start_dim=2), 1, 2))
if verbose: print('Linear reduction:', img_linear.shape)
img_trans = img_linear
img_trans = torch.reshape(torch.swapaxes(img_trans, 1, 2),
(batch_size, self.latent_dim2, nheight_chans, nwidth_chans, ntimepoints))
if verbose: print('Reshaping:', img_trans.shape)
img_deconv = self.convtrans1(img_trans)
if verbose: print('Tubelet Deconv:', img_deconv.shape)
img_op = img_deconv
return img_op
def get_model_output(model_list, criterion, data_list, device):
tube_autoenc, pool_pos, pos_emb = model_list
img_data, mni_coords, subj_ids = data_list
D = pos_emb.shape[2]
pos_locs = pool_pos(mni_coords).numpy()
pos_embeddings = np.zeros([pos_locs.shape[0], D, pos_locs.shape[2], pos_locs.shape[3]])
for bs in range(pos_locs.shape[0]):
for i in range(pos_locs.shape[2]):
for j in range(pos_locs.shape[3]):
pos_embeddings[bs, :, i, j] = pos_emb[pos_locs[bs, 1, i, j], pos_locs[bs, 2, i, j]]
pos_embeddings = torch.tensor(pos_embeddings.astype(np.float32)).to(device)
output = tube_autoenc(img_data.cuda(), pos_embeddings.cuda(), subj_ids.cuda())
loss = criterion(output, img_data.to(device))
return loss
############################
# MODEL TRAINING AND TESTING
############################
# Make this a generic function that takes in the model(s), dataloader, criterion, optimizer and scheduler(s), and returns the cumulative loss.
def train_loop(model_list, dataloader, criterion, optimizer, scheduler_list, device, train_count):
tube_autoenc, pool_pos, pos_emb = model_list
tube_autoenc.train()
try:
cum_loss = []
for data_list in dataloader:
optimizer.zero_grad()
train_loss = get_model_output(model_list, criterion, data_list, device)
cum_loss.append(train_loss.item())
train_loss.backward()
optimizer.step()
if len(cum_loss)>=train_count: break
except StopIteration:
pass
for scheduler in scheduler_list:
scheduler.step()
cum_loss = np.mean(cum_loss)
return cum_loss
def test_loop(model_list, dataloader, criterion, device, is_val=False, val_count=np.inf):
# # Runs it in test-mode. This ensures that the data will be read in order
# # and iterator will start from the beginning each time.
# generator = enumerate(dataloader)
tube_autoenc, pool_pos, pos_emb = model_list
tube_autoenc.eval()
with torch.no_grad():
try:
cum_loss = []
for data_list in dataloader:
val_loss = get_model_output(model_list, criterion, data_list, device)
cum_loss.append(val_loss.item())
if is_val:
if len(cum_loss)>=val_count: break
else:
if (len(cum_loss)-1)%5==0:
print(f'Batch {len(cum_loss)}: {np.mean(cum_loss).round(2)}')
except StopIteration:
pass
cum_loss = np.mean(cum_loss)
return cum_loss
def train(config):
###################
# Initalize Models.
###################
D = latent_dim1 - subj_emb_size
# 8*16*65536 = 4*8*x*int(np.floor(65536/temp_conv_size))
# <=4*65536/int(np.floor(65536/temp_conv_size))
assert 4*30000/int(np.floor(30000/temp_conv_size))>=latent_dim1
assert latent_dim1 > latent_dim2
pos_emb = np.zeros([POS_COORD_LIM*2, POS_COORD_LIM*2, D])
for x in range(10):
for y in range(10):
for i in range(int(D/4)):
pos_emb[x, y, 2*i] = np.sin(x/10000**(4*i/D))
pos_emb[x, y, 2*i+1] = np.cos(x/10000**(4*i/D))
for j in range(int(D/4)):
pos_emb[x, y, 2*j+int(D/2)] = np.sin(y/10000**(4*j/D))
pos_emb[x, y, 2*j+1+int(D/2)] = np.cos(y/10000**(4*j/D))
print(pos_emb.shape)
pool_pos = PoolPosition(grid_conv_size)
pool_pos.eval()
tube_autoenc = TubeEnc(
temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size)
device = 'cpu'
if torch.cuda.is_available():
device = 'cuda:0'
if torch.cuda.device_count() > 1:
print('#'*10, '\nLet\'s use', torch.cuda.device_count(), 'GPUs!\n' + '#'*10)
tube_autoenc = nn.DataParallel(tube_autoenc)
print(device, torch.cuda.is_available())
print('#'*10, tube_autoenc.to(device), '#'*10)
#########################
# Initalize data loaders.
#########################
# Redacted def of `train_dset`
train_dataloader = DataLoader(train_dset, batch_size=int(config['batch_size']), shuffle=True,
pin_memory=True, drop_last=True, collate_fn=collate_fn)
# Redacted def of `val_dset`
val_dataloader = DataLoader(val_dset, batch_size=int(config['batch_size']), shuffle=True,
pin_memory=True, drop_last=True, collate_fn=collate_fn)
################
# MODELING, yay!
################
criterion = nn.MSELoss()
optimizer = optim.SGD(
tube_autoenc.parameters(), lr=config['lr'], momentum=config['momentum'])
schedulers = [
optim.lr_scheduler.ExponentialLR(optimizer, gamma=config['gamma'])
]
print('#'*10, 'DONE', '#'*10)
################
# TRAINING, yay!
################
epoch = 1
val_loss = None
save_args = [
train_subjs, test_subjs, data_combos, subj_emb_list, temp_conv_size,
grid_conv_size, latent_dim1, latent_dim2, subj_emb_size
]
while True:
model_list = [tube_autoenc, pool_pos, pos_emb]
train_loss = train_loop(
model_list, train_dataloader, criterion, optimizer, schedulers, device, train_count=train_count)
val_loss = test_loop(model_list, val_dataloader, criterion, device, is_val=True, val_count=val_count)
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
checkpoint = None
if (epoch-1)%20==0:
# This saves the model to the trial directory
save_args_all = save_args + [
os.path.join(temp_checkpoint_dir, 'model.pt'),
tube_autoenc, pool_pos, optimizer, epoch, train_loss, val_loss]
save_model(*save_args_all)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
# Send the current training result back to Tune
train.report({'mean_loss': val_loss}, checkpoint=checkpoint)
epoch += 1
return
def test(checkpoint, inf_batch_size):
temp_conv_size = checkpoint['temp_conv_size']
grid_conv_size = checkpoint['grid_conv_size']
latent_dim1 = checkpoint['latent_dim1']
latent_dim2 = checkpoint['latent_dim2']
subj_emb_size = checkpoint['subj_emb_size']
###################
# Initalize Models.
###################
D = latent_dim1 - subj_emb_size
# 8*16*65536 = 4*8*x*int(np.floor(65536/temp_conv_size))
# <=4*65536/int(np.floor(65536/temp_conv_size))
assert 4*30000/int(np.floor(30000/temp_conv_size))>=latent_dim1
assert latent_dim1 > latent_dim2
pos_emb = np.zeros([POS_COORD_LIM*2, POS_COORD_LIM*2, D])
for x in range(10):
for y in range(10):
for i in range(int(D/4)):
pos_emb[x, y, 2*i] = np.sin(x/10000**(4*i/D))
pos_emb[x, y, 2*i+1] = np.cos(x/10000**(4*i/D))
for j in range(int(D/4)):
pos_emb[x, y, 2*j+int(D/2)] = np.sin(y/10000**(4*j/D))
pos_emb[x, y, 2*j+1+int(D/2)] = np.cos(y/10000**(4*j/D))
print(pos_emb.shape)
pool_pos = PoolPosition(grid_conv_size)
pool_pos.eval()
tube_autoenc = TubeEnc(
temp_conv_size, grid_conv_size, latent_dim1, latent_dim2, subj_emb_size)
device = 'cpu'
if torch.cuda.is_available():
device = 'cuda:0'
if torch.cuda.device_count() > 1:
print('#'*10, '\nLet\'s use', torch.cuda.device_count(), 'GPUs!\n' + '#'*10)
tube_autoenc = nn.DataParallel(tube_autoenc)
print(device, torch.cuda.is_available())
print('#'*10, tube_autoenc.to(device), '#'*10)
#########################
# Initalize data loaders.
#########################
# Redacted def of `val_dset`
val_dataloader = DataLoader(val_dset, batch_size=inf_batch_size, shuffle=False,
pin_memory=True, drop_last=False, collate_fn=collate_fn)
# Redacted def of `test_dset`
test_dataloader = DataLoader(test_dset, batch_size=inf_batch_size, shuffle=False,
pin_memory=True, drop_last=False, collate_fn=collate_fn)
# Redacted def of `fshot_test_dset`
fshot_test_dataloader = DataLoader(fshot_test_dset, batch_size=inf_batch_size, shuffle=False,
pin_memory=True, drop_last=False, collate_fn=collate_fn)
################
# MODELING, yay!
################
criterion = nn.MSELoss()
################
# TESTING, yay!
################
pool_pos.load_state_dict(checkpoint['posenc_state_dict'])
try:
tube_autoenc.load_state_dict(checkpoint['model_state_dict'])
except RuntimeError:
try:
no_dataPar_state_dict = OrderedDict()
for key, value in checkpoint['model_state_dict'].items():
no_dataPar_state_dict[key[7:]] = value
tube_autoenc.load_state_dict(no_dataPar_state_dict)
except RuntimeError:
dataPar_state_dict = OrderedDict()
for key, value in checkpoint['model_state_dict'].items():
dataPar_state_dict[f'module.{key}'] = value
tube_autoenc.load_state_dict(dataPar_state_dict)
model_list = [tube_autoenc, pool_pos, pos_emb]
val_loss = test_loop(model_list, val_dataloader, criterion, device, is_val=False)
test_loss = test_loop(model_list, test_dataloader, criterion, device, is_val=False)
fshot_test_loss = test_loop(model_list, fshot_test_dataloader, criterion, device, is_val=False)
return val_loss, test_loss, fshot_test_loss
####################
# MISC. HELPER FUNCS
####################
class MyCallback(Callback):
def on_trial_result(self, iteration, trials, trial, result, **info):
print(f'Got result: {result["mean_loss"]}')
def log_gradients_in_model(model, writer, epoch):
for tag, value in model.named_parameters():
if value.grad is not None:
writer.add_histogram(tag + '/grad', value.grad.cpu(), epoch)
def save_model(train_subjs, test_subjs, data_combos, subj_emb_list, temp_conv_size,
grid_conv_size, latent_dim1, latent_dim2, subj_emb_size, file_name,
tube_autoenc, pool_pos, optimizer, epoch, train_loss, val_loss=None):
saving_dict = {
'train_subjs': train_subjs,
'test_subjs': test_subjs,
'data_combos': data_combos,
'subj_emb_list': subj_emb_list,
'temp_conv_size': temp_conv_size,
'grid_conv_size': grid_conv_size,
'latent_dim1': latent_dim1,
'latent_dim2': latent_dim2,
'subj_emb_size': subj_emb_size,
'model_state_dict': tube_autoenc.state_dict(),
'posenc_state_dict': pool_pos.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'epoch': epoch,
'train_loss': train_loss,
}
if val_loss:
saving_dict['val_loss'] = val_loss
torch.save(saving_dict, os.path.join(save_path, file_name))
print('Model saved!')
return
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# parser.add_argument('--gpu_list', type=str, required=True,
# help='List of visible GPUs.')
parser.add_argument('--port', type=int,
help='Ray port.')
parser.add_argument('--load_exp', type=str,
help='Path to model that needs to be loaded.')
parser.add_argument('--save_path', type=str,
help='Path for saving model.')
parser.add_argument('--exp_name', type=str,
help='Experiment name for Ray dashboard.')
parser.add_argument('-verbose', action='store_true',
help='Boolean indicating if print statements should be executed.')
parser.add_argument('-inference', action='store_true',
help='Boolean indicating if model should be run in inference mode.')
##################
# MODEL PARAMETERS
##################
parser.add_argument('--task', type=str, required=True,
help='Task for which the model needs to be trained/tested.')
parser.add_argument('--temp_conv_size', type=int,
help='Number of frames in tubelet (to convolve over).')
parser.add_argument('--grid_conv_size', type=int,
help='Grid downsampling factor.')
parser.add_argument('--latent_dim1', type=int,
help='Number of convolution filters.')
parser.add_argument('--latent_dim2', type=int,
help='Low-dimension to compress to.')
parser.add_argument('--subj_emb_size', type=int,
help='Dimensionality of subject embedding.')
parser.add_argument('--train_subj_perc', default=0.8, type=float,
help='Percentage of data to use as training/validation set.')
parser.add_argument('--num_subjs', default=400, type=int,
help='Max. number of data to load.')
#######################
# MODEL HYPERPARAMETERS
#######################
parser.add_argument('--seed', default=42, type=int)
parser.add_argument('--inf_batch_size', default=64, type=int,
help='Batch size for inference-mode.')
parser.add_argument('--train_count', default=50, type=int,
help='#batches to use for training per epoch.')
parser.add_argument('--val_count', default=50, type=int,
help='#batches to use for estimating validation loss per epoch.')
args = parser.parse_args()
globals().update(args.__dict__)
os.environ['RAY_PICKLE_VERBOSE_DEBUG']='2'
# os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
# os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'
ray.shutdown()
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
# Pre-trained model exists.
if load_exp is not None:
storage_path = '~/ray_results'
experiment_path = os.path.join(storage_path, load_exp)
print(f'Loading results from {experiment_path}...')
restored_tuner = tune.Tuner.restore(experiment_path, trainable=train)
result_grid = restored_tuner.get_results()
best_result: Result = result_grid.get_best_result(metric='mean_loss', mode='min')
print(best_result.config)
result_df = best_result.metrics_dataframe
print(result_df[['training_iteration', 'mean_loss', 'time_total_s']])
with best_result.checkpoint.as_directory() as checkpoint_dir:
checkpoint = torch.load(os.path.join(checkpoint_dir, 'model.pt'))
if inference:
assert load_exp is not None, 'Please specify an experiment to load!'
save_path = os.path.join(experiment_path, 'inference')
assert save_path is not None, 'Please specify a path to save the model!'
save_path = os.path.join(save_path, f'seed{seed}')
if not os.path.exists(save_path):
os.makedirs(save_path)
scrap_path = 'scrap_data/common'
if not os.path.exists(scrap_path):
os.makedirs(scrap_path)
# os.environ['TUNE_RESULT_DIR'] = save_path
# Lots of redacted code on creating the train/val/test split and loading dataset.
#############
# Fit models.
#############
if inference:
val_loss, test_loss, fshot_test_loss = test(checkpoint, 1)
else:
search_space = {
'lr': tune.loguniform(1e-4, 1e-1),
'momentum': tune.uniform(0.1, 0.9),
'gamma': tune.uniform(0.5, 0.95),
'batch_size': tune.choice([8, 16, 32, 64]),
}
ray.init(runtime_env={'py_modules': ['<proj path>/data_utils']},
dashboard_port=port, num_gpus=torch.cuda.device_count())
hyperopt_search = HyperOptSearch(search_space, metric='mean_loss', mode='min')
trainable_with_gpu = tune.with_resources(train, {'gpu': 1})
tuner = tune.Tuner(
trainable_with_gpu,
tune_config=tune.TuneConfig(
num_samples=1,
search_alg=hyperopt_search,
scheduler=ASHAScheduler(metric='mean_loss', mode='min'),
),
run_config=RunConfig(
name=exp_name, callbacks=[MyCallback()], storage_path=save_path,
stop={"training_iteration": 2}),
)
result_grid = tuner.fit()
results_df = result_grid.get_dataframe()
print(results_df)
I am new to ray so any help would be appreciated. Thank you.