Thank you for the reply- so as I mentioned, I’m new to ray and I’ve been using this [tutorial]( Convert existing PyTorch code to Ray AIR — Ray 2.6.1, and now that I look at it, it does say " This tutorial also runs with Ray Data, which gives you the benefits of efficient parallel preprocessing.", so I’m assuming it is using Ray datasets.
Full code might be a bit too unnecessary and long, so I’ll copy and paste the relevant parts of the code and the full error traceback, I’d appreciate any help/guidance!
Code:
def load_dataset(batch_size):
model_df_train = new_df_train.reset_index(drop=True)
model_df_val = new_df_val.reset_index(drop=True)
train_dataset = TrainDataSet(df=model_df_train, transforms=train_transforms)
val_dataset = TrainDataSet(df=model_df_val, transforms=val_transforms)
train_dataset = ray.data.from_torch(train_dataset)
val_dataset = ray.data.from_torch(val_dataset)
train_dataloader = DataLoader(dataset=train_dataset,
batch_size=batch_size,
#pin_memory= true allows faster data transport from cpu to gpu
num_workers=0, pin_memory=True, shuffle=False)
val_dataloader = DataLoader(dataset=val_dataset,
batch_size=batch_size,
num_workers=0, pin_memory=True, shuffle=False)
return train_dataloader, val_dataloader #return train and val dataloaders
def epoch_train(model, optimizer, scheduler, dataloader, device, epoch):
model.train() #set mode to train
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
scaler = GradScaler() #enable GradScaler for gradient scaling, necessary for prevention of underflow of using fp16 using autocast below
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc=‘Train’, colour=‘red’)
for idx, (images, masks) in pbar:
batch_size = model_config.batch_size #return batch size N.
with autocast(enabled=True, dtype=torch.float16): #enable autocast for fp16 training, faster forward pass
y_pred, _ = model(images) #forward pass
loss = loss_func(y_pred, masks) #compute losses from y_pred
loss = loss / model_config.iters_to_accumulate #need to normalize since accumulating gradients
scaler.scale(loss).backward() #backward pass, make sure it is not within autocast
if (idx + 1) % model_config.iters_to_accumulate == 0 : #scale updates should only happen at each # of iters to accumulate
scaler.step(optimizer) #take optimizer step
scaler.update() #update scale for next iteration
optimizer.zero_grad() #zero the accumulated scaled gradients
scheduler.step() #change lr,make sure to call this after scaler.step
running_loss += (loss.item() * batch_size) #update current running loss for all images in batch
dataset_size += batch_size #update current datasize
epoch_loss = running_loss / dataset_size #get current epoch average loss
current_lr = optimizer.param_groups[0][‘lr’]
pbar.set_postfix(train_loss=f’{epoch_loss:0.4f}‘,
lr=f’{current_lr:0.5f}') #print current epoch loss and lr
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage
return epoch_loss #return loss for this epoch
@torch.no_grad() #disable gradient calc for validation
def epoch_valid(model, dataloader, device, epoch,hyp_config):
model.eval() #set mode to eval
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
valid_ap_history = #initialize
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc=‘Validation’, colour=‘red’)
for idx, (images, masks) in pbar:
images = images.to(device, dtype=torch.float) #move tensor to gpu
masks = masks.to(device, dtype=torch.float) #move tensor to gpu
y_pred, _ = model(images) #forward pass
if (idx == 0 or idx == 2) and epoch % 4 == 0: #visualize random images at every 4 epochs to make sure training is progressing
image = images[0] #first H&E image of batch
mask = masks[0] #first ground truth mask of batch
y_pred_prob = nn.Sigmoid()(y_pred) #get prob by applying sigmoid to logit y_pred
y_pred_ind = y_pred_prob[0] #get model prediction of prob, same image as ground truth above
visualize_images_validation(image,mask,y_pred_ind,epoch) #visualize H&E image, ground truth segmentation, and predicted segmentation
loss = loss_func(y_pred, masks) #calculate loss
running_loss += (loss.item() * hyp_config[“batch_size”] * 2) #update current running loss
dataset_size += hyp_config[“batch_size”] * 2 #update current datasize
epoch_loss = running_loss / dataset_size #divide epoch loss by current datasize
masks = masks.squeeze(0)
y_pred_prob = nn.Sigmoid()(y_pred) #get prob by applying sigmoid to logit y_pred
valid_ap = iou_map(masks.cpu().numpy(), y_pred_prob.cpu().numpy(), verbose=0) #find average precision (AP) @IOU = 0.6
valid_ap_history.append(valid_ap)
current_lr = optimizer.param_groups[0][‘lr’]
pbar.set_postfix(valid_loss=f’{epoch_loss:0.3f}‘,
lr=f’{current_lr:0.4f}')
valid_ap_history = np.mean(valid_ap_history, axis=0) #store mean AP
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage
return epoch_loss, valid_ap_history #return loss and AP for this epoch
#function that utilizes above train and validation function to iterate them over training epochs, master train code.
def run_training(hyp_config):
start = time.time() #measure time
print(f"Training for Fold {model_config.current_fold}“)
batch_size_per_worker = hyp_config[“batch_size”] // session.get_world_size()
print(f"batch_size_per_worker is {batch_size_per_worker}”)
train_dataloader, val_dataloader = load_dataset(batch_size_per_worker) #load datasets
train_dataloader = train.torch.prepare_data_loader(train_dataloader)
val_dataloader = train.torch.prepare_data_loader(val_dataloader)
model = build_model() #build model
model = train.torch.prepare_model(model)
print(model)
best_model_wts = copy.deepcopy(model.state_dict())
best_ap = 0 #initial best AP
best_epoch = -1 #initial best epoch
history = defaultdict(list) #history defaultdict to store relevant variables
num_epochs = hyp_config[“epochs”]
optimizer = optim.Adam(model.parameters(),
lr=hyp_config[“learning_rate”],
weight_decay=hyp_config[“weight_decay”]) #initialize optimizer
scheduler = lr_scheduler.CosineAnnealingLR(optimizer,
T_max=hyp_config[“T_max”],
eta_min=hyp_config[“eta_min”]) #initialize LR scheduler
for epoch in range(1, num_epochs + 1): #iter over num total epochs
gc.collect()
print(f"Current Epoch {epoch} / Total Epoch {num_epochs}“)
print(f’Epoch {epoch}/{num_epochs}‘, end=’‘)
train_loss = epoch_train(model, optimizer, scheduler,
dataloader=train_dataloader,
device=model_config.device, epoch=epoch) #train one epoch
valid_loss, valid_ap_history = epoch_valid(model, dataloader=val_dataloader,
device=model_config.device,
epoch=epoch) #valid one epoch
valid_ap = valid_ap_history
checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model=model.state_dict()))
session.report(dict(loss=valid_loss,ap = valid_ap),checkpoint=checkpoint)
history[‘Train Loss’].append(train_loss)
history[‘Valid Loss’].append(valid_loss)
history[‘Valid AP’].append(valid_ap)
print(f’Valid AP: {valid_ap:0.4f}’)
#if AP improves, save the best model
if valid_ap >= best_ap:
print(f"Valid Score Improved ({best_ap:0.4f} —> {valid_ap:0.4f})”)
best_ap = valid_ap
best_epoch = epoch
best_model_wts = copy.deepcopy(model.state_dict())
PATH = os.path.join(model_config.model_save_directory, f"best_epoch-{model_config.current_fold:02d}.pt")
if not os.path.exists(model_config.model_save_directory):
os.makedirs(model_config.model_save_directory)
torch.save(model.state_dict(), PATH)
print(“Model Saved!”)
print(f’Best AP so far: {best_ap:0.4f}‘)
print(f’Best AP at epoch #: {best_epoch:d}’)
#also save the most recent model
last_model_wts = copy.deepcopy(model.state_dict())
PATH = os.path.join(model_config.model_save_directory, f"latest_epoch-{model_config.current_fold:02d}.pt")
if not os.path.exists(model_config.model_save_directory):
os.makedirs(model_config.model_save_directory)
torch.save(model.state_dict(), PATH)
end = time.time()
time_elapsed = end - start
print('Training complete in {:.0f}h {:.0f}m'.format(
time_elapsed // 3600, (time_elapsed % 3600) // 60))
print("Best AP@ 0.6IOU: {:.4f}".format(best_ap))
#load best model weights
model.load_state_dict(best_model_wts)
pkl_save_path = os.path.join(model_config.model_save_directory, 'history.pickle')
#save history as pkl:
with open(pkl_save_path, 'wb') as file:
pickle.dump(history, file)
print(f"Finished Training for fold {model_config.current_fold}")
#finally run training:
trainer = TorchTrainer(
train_loop_per_worker=run_training,
train_loop_config=hyp_config,
torch_config = TorchConfig(backend=“gloo”), #change to gloo on windows, since no nccl
scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
)
result = trainer.fit()
print(f"Last result: {result.metrics}“)
print(f"Checkpoint: {result.checkpoint}”)
Error traceback:
RayTaskError(AttributeError) Traceback (most recent call last)
Cell In[4], line 406
399 #finally run training:
400 trainer = TorchTrainer(
401 train_loop_per_worker=run_training,
402 train_loop_config=hyp_config,
403 torch_config = TorchConfig(backend=“gloo”), #change to gloo on windows, since no nccl
404 scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
405 )
→ 406 result = trainer.fit()
407 print(f"Last result: {result.metrics}“)
408 print(f"Checkpoint: {result.checkpoint}”)
File ~.conda\envs\hubmap\lib\site-packages\ray\train\base_trainer.py:368, in BaseTrainer.fit(self)
366 result = result_grid[0]
367 if result.error:
→ 368 raise result.error
369 except TuneError as e:
370 raise TrainingFailedError from e
RayTaskError(AttributeError): ray::_Inner.train() (pid=31276, ip=127.0.0.1, repr=TorchTrainer)
File “python\ray_raylet.pyx”, line 857, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 861, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 803, in ray._raylet.execute_task.function_executor
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 466, in _resume_span
return method(self, *_args, **_kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\trainable\trainable.py”, line 368, in train
raise skipped from exception_cause(skipped)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\utils.py”, line 54, in check_for_failure
ray.get(object_ref)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\client_mode_hook.py”, line 105, in wrapper
return func(*args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\worker.py”, line 2380, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::RayTrainWorker._RayTrainWorker__execute() (pid=32128, ip=127.0.0.1, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x0000014E7C874820>)
File “python\ray_raylet.pyx”, line 857, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 861, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 803, in ray._raylet.execute_task.function_executor
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 466, in _resume_span
return method(self, *_args, **_kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\worker_group.py”, line 31, in __execute
raise skipped from exception_cause(skipped)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\utils.py”, line 129, in discard_return_wrapper
train_func(*args, **kwargs)
File “C:\Users\Kevin\AppData\Local\Temp\ipykernel_23780\1386670646.py”, line 350, in run_training
File “C:\Users\Kevin\AppData\Local\Temp\ipykernel_23780\1386670646.py”, line 247, in epoch_train
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train\torch\train_loop_utils.py”, line 607, in iter
self._prefetch_next_batch()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train\torch\train_loop_utils.py”, line 602, in _prefetch_next_batch
next_batch = next(self.dataloader_iter, None)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 633, in next
data = self._next_data()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 676, in _next_data
index = self._next_index() # may raise StopIteration
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 623, in _next_index
return next(self._sampler_iter) # may raise StopIteration
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\sampler.py”, line 254, in iter
for idx in self.sampler:
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\sampler.py”, line 76, in iter
return iter(range(len(self.data_source)))
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\data\dataset.py”, line 4383, in len
raise AttributeError(
AttributeError: Use ds.count()
to compute the length of a distributed Dataset. This may be an expensive operation.