Hi, I am doing experiments on continual learning. Tuning a hyperparameters in a continuous manner specific to each task. My RayTune related code is here:
for task in range(10):
model.incremental_train(data_manager)
if task == 0:
model.training()
model.check_fisher()
else:
config = {"hyperparameter": tune.uniform(1,100000)}
hyperopt_search = HyperOptSearch(metric='accuracy', mode='max')
scheduler = ASHAScheduler(
metric="accuracy",
mode="max",
max_t=100,
grace_period=5,
reduction_factor=2)
result = tune.run(tune.with_parameters(model.update_representation),
resources_per_trial={"cpu": 12, "gpu": 1},
config=config,
num_samples=1,
search_alg=hyperopt_search,
scheduler=scheduler,
#keep_checkpoints_num=2,
checkpoint_score_attr="accuracy")
best_trial = result.get_best_trial("accuracy", "max", "last")
best_checkpoint = result.get_best_checkpoint(trial=best_trial, metric="accuracy", mode="max")
best_checkpoint_dir = best_checkpoint.to_directory(path="directory")
model_state, optimizer_state = torch.load(os.path.join(best_checkpoint_dir, "checkpoint"))
model._network.load_state_dict(model_state)
model.check_fisher()
model._network.load_state_dict(model_state)
model is a Class including methods: “incremental_train”, “update_representation”, “check_fisher”,
class model(BaseLearner):
def __init__(self):
super().__init__()
self.fisher = None
self._network = IncrementalNet("resnet18", False)
def after_task(self):
self._known_classes = self._total_classes
def incremental_train(self, data_manager):
self._cur_task += 1
self._total_classes = self._known_classes + data_manager.get_task_size(self._cur_task)
self._network.update_fc(self._total_classes)
logging.info("Learning on {}-{}".format(self._known_classes, self._total_classes))
train_dataset, val_set = data_manager.get_dataset_with_split(np.arange(self._known_classes, self._total_classes),source="train",mode="train",val_samples_per_class=100)
self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
self.forget_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_dataset = data_manager.get_dataset(np.arange(0, self._total_classes), source="test", mode="test")
self.test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
def check_fisher(self):
self._network.to(self._device)
if self.fisher is None:
self.fisher = self.getFisherDiagonal()
else:
alpha = self._known_classes / self._total_classes
new_finsher = self.getFisherDiagonal()
for n, p in new_finsher.items():
new_finsher[n][: len(self.fisher[n])] = (
alpha * self.fisher[n]
+ (1 - alpha) * new_finsher[n][: len(self.fisher[n])]
)
self.fisher = new_finsher
self.mean = {
n: p.clone().detach()
for n, p in self._network.named_parameters()
if p.requires_grad
}
def training(self):
self._network.to(self._device)
optimizer = optim.SGD(self._network.parameters(),momentum=0.9,lr=init_lr,weight_decay=init_weight_decay,)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=init_milestones, gamma=init_lr_decay)
self.init_train(self.train_loader, self.test_loader, optimizer, scheduler)
def init_train(self, train_loader, test_loader, optimizer, scheduler):
prog_bar = tqdm(range(init_epoch))
for _, epoch in enumerate(prog_bar):
self._network.train()
losses = 0.0
correct, total = 0, 0
for i, (_, inputs, targets) in enumerate(train_loader):
targets = targets.type(torch.LongTensor)
inputs, targets = inputs.to(self._device), targets.to(self._device)
logits = self._network(inputs)["logits"]
loss = F.cross_entropy(logits, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses += loss.item()
_, preds = torch.max(logits, dim=1)
correct += preds.eq(targets.expand_as(preds)).cpu().sum()
total += len(targets)
scheduler.step()
train_acc = np.around(tensor2numpy(correct) * 100 / total, decimals=2)
if epoch % 5 == 0:
info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}".format(
self._cur_task,
epoch + 1,
init_epoch,
losses / len(train_loader),
train_acc,
)
else:
test_acc, loss_test = self._compute_accuracy(self._network, test_loader)
info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}, Test_accy {:.2f}".format(
self._cur_task,
epoch + 1,
init_epoch,
losses / len(train_loader),
train_acc,
test_acc,
)
prog_bar.set_description(info)
logging.info(info)
def update_representation(self, config):
self._network.to(self._device)
#train_loader = ray.get(train_loader)
optimizer = optim.SGD(self._network.parameters(),lr=lrate,momentum=0.9,weight_decay=weight_decay)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=milestones, gamma=lrate_decay)
prog_bar = tqdm(range(epochs))
for _, epoch in enumerate(prog_bar):
self._network.train()
losses = 0.0
correct, total = 0, 0
for i, (_, inputs, targets) in enumerate(self.train_loader):
targets = targets.type(torch.LongTensor)
inputs, targets = inputs.to(self._device), targets.to(self._device)
logits = self._network(inputs)["logits"]
loss_clf = F.cross_entropy(
logits[:, self._known_classes :], targets - self._known_classes
)
lamda = config["lamda"]
loss_ewc = self.compute_ewc()
loss = loss_clf + lamda * loss_ewc
#loss = loss_clf + lamda
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses += loss.item()
_, preds = torch.max(logits, dim=1)
correct += preds.eq(targets.expand_as(preds)).cpu().sum()
total += len(targets)
scheduler.step()
train_acc = np.around(tensor2numpy(correct) * 100 / total, decimals=2)
for i, (_, inputs, targets) in enumerate(self.forget_loader):
targets = targets.type(torch.LongTensor)
inputs, targets = inputs.to(self._device), targets.to(self._device)
forget_acc, loss_forget = self._compute_accuracy(self._network, self.forget_loader)
with tune.checkpoint_dir(epoch) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
torch.save((self._network.state_dict(), optimizer.state_dict()), path)
tune.report(loss=loss_forget, accuracy=forget_acc)
for i, (_, inputs, targets) in enumerate(self.test_loader):
targets = targets.type(torch.LongTensor)
inputs, targets = inputs.to(self._device), targets.to(self._device)
test_acc, loss_test = self._compute_accuracy(self._network, self.test_loader)
info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}, Test_accy {:.2f}".format(
self._cur_task,
epoch + 1,
epochs,
losses / len(self.train_loader),
train_acc,
test_acc,)
prog_bar.set_description(info)
logging.info(info)
def compute_ewc(self):
loss = 0
if len(self._multiple_gpus) > 1:
for n, p in self._network.module.named_parameters():
if n in self.fisher.keys():
loss += (
torch.sum(
(self.fisher[n])
* (p[: len(self.mean[n])] - self.mean[n]).pow(2)
)
/ 2
)
else:
for n, p in self._network.named_parameters():
if n in self.fisher.keys():
loss += (
torch.sum(
(self.fisher[n])
* (p[: len(self.mean[n])] - self.mean[n]).pow(2)
)
/ 2
)
return loss
def getFisherDiagonal(self):
fisher = {
n: torch.zeros(p.shape).to(self._device)
for n, p in self._network.named_parameters()
if p.requires_grad
}
self._network.train()
optimizer = optim.SGD(self._network.parameters(), lr=lrate)
for i, (_, inputs, targets) in enumerate(self.train_loader):
targets = targets.type(torch.LongTensor)
inputs, targets = inputs.to(self._device), targets.to(self._device)
logits = self._network(inputs)["logits"]
loss = torch.nn.functional.cross_entropy(logits, targets)
optimizer.zero_grad()
loss.backward()
for n, p in self._network.named_parameters():
if p.grad is not None:
fisher[n] += p.grad.pow(2).clone()
for n, p in fisher.items():
fisher[n] = p / len(self.train_loader)
fisher[n] = torch.min(fisher[n], torch.tensor(fishermax))
return fisher
First task is done when model.training() is called.
It completes the first task since there is no tuning (no ray tune) in that part. However, when it continue to next task I am getting these statement:
– Started a local Ray instance.
And the error comes:
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 573, in start_trial
return self._start_trial(trial)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 473, in _start_trial
runner = self._setup_remote_runner(trial)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 414, in _setup_remote_runner
return full_actor_class.remote(**kwargs)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\actor.py”, line 637, in remote
return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 387, in _invocation_actor_class_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\actor.py”, line 844, in _remote
worker.function_actor_manager.export_actor_class(
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray_private\function_manager.py”, line 479, in export_actor_class
check_oversized_function(
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray_private\utils.py”, line 742, in check_oversized_function
raise ValueError(error)
ValueError: The actor ImplicitFunc is too large (151 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
This error is thrown after increasing my model size. Before increasing the model size. function size was under the defined threshold value (28MiB).
I am not sure where and how the use ray.put and ray.get should be used since the network is called many places in the code.
I would like to run experiments with bigger models and any help is appreciated.
Thanks