Troubleshooting Ray Train on Windows

Hi, I’m a new ray user, so while I was converting my pytorch code to Ray AIR by using this tutorial, I get one error and one warning that I cannot troubleshoot even when searching in forums or github issues.

The error that stops my trainer from working is: “AttributeError: Use ds.count() to compute the length of a distributed Dataset. This may be an expensive operation.” What does this mean and how can I possibly troubleshoot this? This seems to happen when starting the trainer.

The warning that I’m unsure if it’s related to the error, but on search it seems like this is a new issue, as this warning has seemingly been happening to other users as well (see the github issue here). What could be the issue for this as well?

Hi, I’ve temporarily overcome the warning by downgrading to ray 2.3.0. However, I still get the “AttributeError: Use ds.count() to compute the length of a distributed Dataset. This may be an expensive operation.” What does this error really mean and how do I overcome this? Would appreciate any help, thank you!

Can you share what your script looks like? Are you using Ray Datasets anywhere?

Thank you for the reply- so as I mentioned, I’m new to ray and I’ve been using this [tutorial]( Convert existing PyTorch code to Ray AIR — Ray 2.6.1, and now that I look at it, it does say " This tutorial also runs with Ray Data, which gives you the benefits of efficient parallel preprocessing.", so I’m assuming it is using Ray datasets.

Full code might be a bit too unnecessary and long, so I’ll copy and paste the relevant parts of the code and the full error traceback, I’d appreciate any help/guidance!

Code:

def load_dataset(batch_size):
model_df_train = new_df_train.reset_index(drop=True)
model_df_val = new_df_val.reset_index(drop=True)
train_dataset = TrainDataSet(df=model_df_train, transforms=train_transforms)
val_dataset = TrainDataSet(df=model_df_val, transforms=val_transforms)
train_dataset = ray.data.from_torch(train_dataset)
val_dataset = ray.data.from_torch(val_dataset)
train_dataloader = DataLoader(dataset=train_dataset,
batch_size=batch_size,
#pin_memory= true allows faster data transport from cpu to gpu
num_workers=0, pin_memory=True, shuffle=False)
val_dataloader = DataLoader(dataset=val_dataset,
batch_size=batch_size,
num_workers=0, pin_memory=True, shuffle=False)
return train_dataloader, val_dataloader #return train and val dataloaders

def epoch_train(model, optimizer, scheduler, dataloader, device, epoch):
model.train() #set mode to train
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
scaler = GradScaler() #enable GradScaler for gradient scaling, necessary for prevention of underflow of using fp16 using autocast below
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc=‘Train’, colour=‘red’)
for idx, (images, masks) in pbar:
batch_size = model_config.batch_size #return batch size N.
with autocast(enabled=True, dtype=torch.float16): #enable autocast for fp16 training, faster forward pass
y_pred, _ = model(images) #forward pass
loss = loss_func(y_pred, masks) #compute losses from y_pred
loss = loss / model_config.iters_to_accumulate #need to normalize since accumulating gradients
scaler.scale(loss).backward() #backward pass, make sure it is not within autocast
if (idx + 1) % model_config.iters_to_accumulate == 0 : #scale updates should only happen at each # of iters to accumulate
scaler.step(optimizer) #take optimizer step
scaler.update() #update scale for next iteration
optimizer.zero_grad() #zero the accumulated scaled gradients
scheduler.step() #change lr,make sure to call this after scaler.step
running_loss += (loss.item() * batch_size) #update current running loss for all images in batch
dataset_size += batch_size #update current datasize
epoch_loss = running_loss / dataset_size #get current epoch average loss
current_lr = optimizer.param_groups[0][‘lr’]
pbar.set_postfix(train_loss=f’{epoch_loss:0.4f}‘,
lr=f’{current_lr:0.5f}') #print current epoch loss and lr
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage
return epoch_loss #return loss for this epoch

@torch.no_grad() #disable gradient calc for validation
def epoch_valid(model, dataloader, device, epoch,hyp_config):
model.eval() #set mode to eval
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
valid_ap_history = #initialize
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc=‘Validation’, colour=‘red’)
for idx, (images, masks) in pbar:
images = images.to(device, dtype=torch.float) #move tensor to gpu
masks = masks.to(device, dtype=torch.float) #move tensor to gpu
y_pred, _ = model(images) #forward pass
if (idx == 0 or idx == 2) and epoch % 4 == 0: #visualize random images at every 4 epochs to make sure training is progressing
image = images[0] #first H&E image of batch
mask = masks[0] #first ground truth mask of batch
y_pred_prob = nn.Sigmoid()(y_pred) #get prob by applying sigmoid to logit y_pred
y_pred_ind = y_pred_prob[0] #get model prediction of prob, same image as ground truth above
visualize_images_validation(image,mask,y_pred_ind,epoch) #visualize H&E image, ground truth segmentation, and predicted segmentation
loss = loss_func(y_pred, masks) #calculate loss
running_loss += (loss.item() * hyp_config[“batch_size”] * 2) #update current running loss
dataset_size += hyp_config[“batch_size”] * 2 #update current datasize
epoch_loss = running_loss / dataset_size #divide epoch loss by current datasize
masks = masks.squeeze(0)
y_pred_prob = nn.Sigmoid()(y_pred) #get prob by applying sigmoid to logit y_pred
valid_ap = iou_map(masks.cpu().numpy(), y_pred_prob.cpu().numpy(), verbose=0) #find average precision (AP) @IOU = 0.6
valid_ap_history.append(valid_ap)
current_lr = optimizer.param_groups[0][‘lr’]
pbar.set_postfix(valid_loss=f’{epoch_loss:0.3f}‘,
lr=f’{current_lr:0.4f}')
valid_ap_history = np.mean(valid_ap_history, axis=0) #store mean AP
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage

return epoch_loss, valid_ap_history  #return loss and AP for this epoch

#function that utilizes above train and validation function to iterate them over training epochs, master train code.
def run_training(hyp_config):
start = time.time() #measure time
print(f"Training for Fold {model_config.current_fold}“)
batch_size_per_worker = hyp_config[“batch_size”] // session.get_world_size()
print(f"batch_size_per_worker is {batch_size_per_worker}”)
train_dataloader, val_dataloader = load_dataset(batch_size_per_worker) #load datasets
train_dataloader = train.torch.prepare_data_loader(train_dataloader)
val_dataloader = train.torch.prepare_data_loader(val_dataloader)
model = build_model() #build model
model = train.torch.prepare_model(model)
print(model)
best_model_wts = copy.deepcopy(model.state_dict())
best_ap = 0 #initial best AP
best_epoch = -1 #initial best epoch
history = defaultdict(list) #history defaultdict to store relevant variables
num_epochs = hyp_config[“epochs”]
optimizer = optim.Adam(model.parameters(),
lr=hyp_config[“learning_rate”],
weight_decay=hyp_config[“weight_decay”]) #initialize optimizer
scheduler = lr_scheduler.CosineAnnealingLR(optimizer,
T_max=hyp_config[“T_max”],
eta_min=hyp_config[“eta_min”]) #initialize LR scheduler
for epoch in range(1, num_epochs + 1): #iter over num total epochs
gc.collect()
print(f"Current Epoch {epoch} / Total Epoch {num_epochs}“)
print(f’Epoch {epoch}/{num_epochs}‘, end=’‘)
train_loss = epoch_train(model, optimizer, scheduler,
dataloader=train_dataloader,
device=model_config.device, epoch=epoch) #train one epoch
valid_loss, valid_ap_history = epoch_valid(model, dataloader=val_dataloader,
device=model_config.device,
epoch=epoch) #valid one epoch
valid_ap = valid_ap_history
checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model=model.state_dict()))
session.report(dict(loss=valid_loss,ap = valid_ap),checkpoint=checkpoint)
history[‘Train Loss’].append(train_loss)
history[‘Valid Loss’].append(valid_loss)
history[‘Valid AP’].append(valid_ap)
print(f’Valid AP: {valid_ap:0.4f}’)
#if AP improves, save the best model
if valid_ap >= best_ap:
print(f"Valid Score Improved ({best_ap:0.4f} —> {valid_ap:0.4f})”)
best_ap = valid_ap
best_epoch = epoch
best_model_wts = copy.deepcopy(model.state_dict())
PATH = os.path.join(model_config.model_save_directory, f"best_epoch-{model_config.current_fold:02d}.pt")
if not os.path.exists(model_config.model_save_directory):
os.makedirs(model_config.model_save_directory)
torch.save(model.state_dict(), PATH)
print(“Model Saved!”)
print(f’Best AP so far: {best_ap:0.4f}‘)
print(f’Best AP at epoch #: {best_epoch:d}’)

    #also save the most recent model
    last_model_wts = copy.deepcopy(model.state_dict())
    PATH = os.path.join(model_config.model_save_directory, f"latest_epoch-{model_config.current_fold:02d}.pt")
    if not os.path.exists(model_config.model_save_directory):
        os.makedirs(model_config.model_save_directory)
    torch.save(model.state_dict(), PATH)

end = time.time()
time_elapsed = end - start
print('Training complete in {:.0f}h {:.0f}m'.format(
    time_elapsed // 3600, (time_elapsed % 3600) // 60))
print("Best AP@ 0.6IOU: {:.4f}".format(best_ap))

#load best model weights
model.load_state_dict(best_model_wts)

pkl_save_path = os.path.join(model_config.model_save_directory, 'history.pickle')
#save history as pkl:
with open(pkl_save_path, 'wb') as file:
    pickle.dump(history, file)
print(f"Finished Training for fold {model_config.current_fold}")

#finally run training:
trainer = TorchTrainer(
train_loop_per_worker=run_training,
train_loop_config=hyp_config,
torch_config = TorchConfig(backend=“gloo”), #change to gloo on windows, since no nccl
scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
)
result = trainer.fit()
print(f"Last result: {result.metrics}“)
print(f"Checkpoint: {result.checkpoint}”)

Error traceback:


RayTaskError(AttributeError) Traceback (most recent call last)
Cell In[4], line 406
399 #finally run training:
400 trainer = TorchTrainer(
401 train_loop_per_worker=run_training,
402 train_loop_config=hyp_config,
403 torch_config = TorchConfig(backend=“gloo”), #change to gloo on windows, since no nccl
404 scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
405 )
→ 406 result = trainer.fit()
407 print(f"Last result: {result.metrics}“)
408 print(f"Checkpoint: {result.checkpoint}”)

File ~.conda\envs\hubmap\lib\site-packages\ray\train\base_trainer.py:368, in BaseTrainer.fit(self)
366 result = result_grid[0]
367 if result.error:
→ 368 raise result.error
369 except TuneError as e:
370 raise TrainingFailedError from e
RayTaskError(AttributeError): ray::_Inner.train() (pid=31276, ip=127.0.0.1, repr=TorchTrainer)
File “python\ray_raylet.pyx”, line 857, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 861, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 803, in ray._raylet.execute_task.function_executor
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 466, in _resume_span
return method(self, *_args, **_kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\trainable\trainable.py”, line 368, in train
raise skipped from exception_cause(skipped)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\utils.py”, line 54, in check_for_failure
ray.get(object_ref)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\client_mode_hook.py”, line 105, in wrapper
return func(*args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\worker.py”, line 2380, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::RayTrainWorker._RayTrainWorker__execute() (pid=32128, ip=127.0.0.1, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x0000014E7C874820>)
File “python\ray_raylet.pyx”, line 857, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 861, in ray._raylet.execute_task
File “python\ray_raylet.pyx”, line 803, in ray._raylet.execute_task.function_executor
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\function_manager.py”, line 674, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 466, in _resume_span
return method(self, *_args, **_kwargs)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\worker_group.py”, line 31, in __execute
raise skipped from exception_cause(skipped)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train_internal\utils.py”, line 129, in discard_return_wrapper
train_func(*args, **kwargs)
File “C:\Users\Kevin\AppData\Local\Temp\ipykernel_23780\1386670646.py”, line 350, in run_training
File “C:\Users\Kevin\AppData\Local\Temp\ipykernel_23780\1386670646.py”, line 247, in epoch_train
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train\torch\train_loop_utils.py”, line 607, in iter
self._prefetch_next_batch()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\train\torch\train_loop_utils.py”, line 602, in _prefetch_next_batch
next_batch = next(self.dataloader_iter, None)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 633, in next
data = self._next_data()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 676, in _next_data
index = self._next_index() # may raise StopIteration
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\dataloader.py”, line 623, in _next_index
return next(self._sampler_iter) # may raise StopIteration
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\sampler.py”, line 254, in iter
for idx in self.sampler:
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\torch\utils\data\sampler.py”, line 76, in iter
return iter(range(len(self.data_source)))
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\data\dataset.py”, line 4383, in len
raise AttributeError(
AttributeError: Use ds.count() to compute the length of a distributed Dataset. This may be an expensive operation.

Ah I see. The part to call out here is that a Ray Dataset is not a Torch Dataset, so you won’t be able to call DataLoader(dataset=ray_dataset).

We’re working on clarifying this in the documentation for Ray 2.7, but here is an existing example of using Ray Data!

I’m a little bit confused- doesn’t the code

train_dataset = ray.data.from_torch(train_dataset)
val_dataset = ray.data.from_torch(val_dataset)

convert the torch dataset to a ray dataset? This is what the documentation seems to do as
well in the link that you’ve provided in the " Load and normalize CIFAR-10" section. I’d appreciate any guidance, thanks for your quick turnaround!

Right, the error happens in the next line:

train_dataloader = DataLoader(dataset=train_dataset,

train_dataset is a Ray Dataset here, but a DataLoader only takes in a Torch Dataset.

In the CIFAR10 example we pass the Ray Dataset into the TorchTrainer, and read from it inside the training function with session.get_dataset_shardanditer_torch_batches` (which effectively creates the DataLoader).

Ah, thank you for the guidance. I went through the tutorial, and solved the initial ds.count() error, but actually ended up getting another. Below is some of the relevant code and the full error traceback, I’m not sure what to do about this either, why is .materialize() not working? Is it somehow not converted to a ray dataset?

I did modify the train and validation code as you said so that we pass the ray dataset to the torchtrainer and read it inside the training/validating function with the session.get_dataset_shard. I’m using Ray 2.6.1 right now. Would really appreciate any input to this as well, thank you in advance!

Code:

def convert_batch_to_numpy(batch) → Dict[str, np.ndarray]:
images = np.stack([np.array(image) for image, _ in batch[“item”]])
masks = np.stack([np.array(masks) for masks, _ in batch[“item”]])
return {“image”: images, “mask”: masks}

def load_dataset():
model_df_train = new_df_train.reset_index(drop=True)
model_df_val = new_df_val.reset_index(drop=True)
train_dataset = TrainDataSet(df=model_df_train, transforms=train_transforms)
val_dataset = TrainDataSet(df=model_df_val, transforms=val_transforms)
return train_dataset, val_dataset

train_dataset, val_dataset = load_dataset() #load datasets
train_dataset: ray.data.Dataset = ray.data.from_torch(train_dataset)
val_dataset: ray.data.Dataset = ray.data.from_torch(val_dataset)
train_dataset= train_dataset.map_batches(convert_batch_to_numpy).materialize()
val_dataset = val_dataset.map_batches(convert_batch_to_numpy).materialize()
trainer = TorchTrainer(
train_loop_per_worker=run_training,
train_loop_config=hyp_config,
datasets={“train”: train_dataset, “valid” : val_dataset},
torch_config = TorchConfig(backend=“gloo”), #change to gloo on windows, since no nccl
scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
)
result = trainer.fit()
print(f"Last result: {result.metrics}“)
print(f"Checkpoint: {result.checkpoint}”)

Error:

AttributeError Traceback (most recent call last)
Cell In[8], line 402
400 train_dataset: ray.data.Dataset = ray.data.from_torch(train_dataset)
401 val_dataset: ray.data.Dataset = ray.data.from_torch(val_dataset)
→ 402 train_dataset= train_dataset.map_batches(convert_batch_to_numpy).materialize()
403 val_dataset = val_dataset.map_batches(convert_batch_to_numpy).materialize()
404 trainer = TorchTrainer(
405 train_loop_per_worker=run_training,
406 train_loop_config=hyp_config,
(…)
409 scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
410 )

AttributeError: ‘Dataset’ object has no attribute ‘materialize’

Actually, please disregard my previous issue, it somehow resolved itself when I ran it in separate cells of the Jupyter notebook. However, something odd is happening- I’m currently using Windows 10, Python 3.10 and Ray 2.6.1- and when I run my code for training, I get tens of gigabytes of temporary files added each time I run train, and this doesn’t disappear even after restarting python/windows.

I cannot find any related topics online, do you happen to know why this may be happening? I can’t even locate it (windows says I have 200 GB of temporary files but only shows 50 MB of deletable files), and it’s not the ray_results folder either. Would appreciate any input my computer is about to run out of memory…

Sorry for a flood of replies, but I did find out where this was coming from-it was from the files created in the ray_spilled_objects folder in each session run file in local/temp/ray directory. The files are named like “e12da97c233d406d9dcae884bb445c2c-multi-380”. How do I prevent this from happening? I’ve looked at similar problems in stackoverflow/github issues with no success. Thank you!

Hmm can you create a post in Ray Core - Ray? Spilled objects should be cleaned up so it sounds like there’s a bug somewhere.