The actor ImplicitFunc is too large

Hi, I am doing experiments on continual learning. Tuning a hyperparameters in a continuous manner specific to each task. My RayTune related code is here:

for task in range(10):
    
        model.incremental_train(data_manager)
        if task == 0:
            model.training()
            model.check_fisher()

        else:
            config = {"hyperparameter": tune.uniform(1,100000)}
            hyperopt_search = HyperOptSearch(metric='accuracy', mode='max')
            scheduler = ASHAScheduler(
                metric="accuracy",
                mode="max",
                max_t=100,
                grace_period=5,
                reduction_factor=2)
            result = tune.run(tune.with_parameters(model.update_representation),
                              resources_per_trial={"cpu": 12, "gpu": 1},
                              config=config,
                              num_samples=1,
                              search_alg=hyperopt_search,
                              scheduler=scheduler,
                              #keep_checkpoints_num=2,
                              checkpoint_score_attr="accuracy")

            best_trial = result.get_best_trial("accuracy", "max", "last")
            best_checkpoint = result.get_best_checkpoint(trial=best_trial, metric="accuracy", mode="max")
            best_checkpoint_dir = best_checkpoint.to_directory(path="directory")
            model_state, optimizer_state = torch.load(os.path.join(best_checkpoint_dir, "checkpoint"))
            model._network.load_state_dict(model_state)
            model.check_fisher()
            model._network.load_state_dict(model_state)

model is a Class including methods: “incremental_train”, “update_representation”, “check_fisher”,

class model(BaseLearner):
    def __init__(self):
        super().__init__()
        self.fisher = None
        self._network = IncrementalNet("resnet18", False)

    def after_task(self):
        self._known_classes = self._total_classes

    def incremental_train(self, data_manager):
        self._cur_task += 1
        self._total_classes = self._known_classes + data_manager.get_task_size(self._cur_task)
        self._network.update_fc(self._total_classes)
        logging.info("Learning on {}-{}".format(self._known_classes, self._total_classes))

        train_dataset, val_set = data_manager.get_dataset_with_split(np.arange(self._known_classes, self._total_classes),source="train",mode="train",val_samples_per_class=100)
        self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
        self.forget_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
        test_dataset = data_manager.get_dataset(np.arange(0, self._total_classes), source="test", mode="test")
        self.test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    def check_fisher(self):
        self._network.to(self._device)
        if self.fisher is None:

            self.fisher = self.getFisherDiagonal()
        else:

            alpha = self._known_classes / self._total_classes
            new_finsher = self.getFisherDiagonal()
            for n, p in new_finsher.items():
                new_finsher[n][: len(self.fisher[n])] = (
                    alpha * self.fisher[n]
                    + (1 - alpha) * new_finsher[n][: len(self.fisher[n])]
                )
            self.fisher = new_finsher
        self.mean = {
            n: p.clone().detach()
            for n, p in self._network.named_parameters()
            if p.requires_grad
        }

    def training(self):
        self._network.to(self._device)
        optimizer = optim.SGD(self._network.parameters(),momentum=0.9,lr=init_lr,weight_decay=init_weight_decay,)
        scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=init_milestones, gamma=init_lr_decay)
        self.init_train(self.train_loader, self.test_loader, optimizer, scheduler)

    def init_train(self, train_loader, test_loader, optimizer, scheduler):
        prog_bar = tqdm(range(init_epoch))
        for _, epoch in enumerate(prog_bar):
            self._network.train()
            losses = 0.0
            correct, total = 0, 0
            for i, (_, inputs, targets) in enumerate(train_loader):
                targets = targets.type(torch.LongTensor)
                inputs, targets = inputs.to(self._device), targets.to(self._device)
                logits = self._network(inputs)["logits"]
                loss = F.cross_entropy(logits, targets)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                losses += loss.item()

                _, preds = torch.max(logits, dim=1)
                correct += preds.eq(targets.expand_as(preds)).cpu().sum()
                total += len(targets)

            scheduler.step()
            train_acc = np.around(tensor2numpy(correct) * 100 / total, decimals=2)

            if epoch % 5 == 0:
                info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}".format(
                    self._cur_task,
                    epoch + 1,
                    init_epoch,
                    losses / len(train_loader),
                    train_acc,
                )
            else:
                test_acc, loss_test = self._compute_accuracy(self._network, test_loader)
                info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}, Test_accy {:.2f}".format(
                    self._cur_task,
                    epoch + 1,
                    init_epoch,
                    losses / len(train_loader),
                    train_acc,
                    test_acc,
                )
            prog_bar.set_description(info)

        logging.info(info)

    def update_representation(self, config):
        self._network.to(self._device)
        #train_loader = ray.get(train_loader)
        optimizer = optim.SGD(self._network.parameters(),lr=lrate,momentum=0.9,weight_decay=weight_decay)
        scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=milestones, gamma=lrate_decay)

        prog_bar = tqdm(range(epochs))
        for _, epoch in enumerate(prog_bar):
            self._network.train()
            losses = 0.0
            correct, total = 0, 0
            for i, (_, inputs, targets) in enumerate(self.train_loader):
                targets = targets.type(torch.LongTensor)
                inputs, targets = inputs.to(self._device), targets.to(self._device)
                logits = self._network(inputs)["logits"]

                loss_clf = F.cross_entropy(
                    logits[:, self._known_classes :], targets - self._known_classes
                )
                lamda = config["lamda"]
                loss_ewc = self.compute_ewc()
                loss = loss_clf + lamda * loss_ewc
                #loss = loss_clf + lamda

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                losses += loss.item()

                _, preds = torch.max(logits, dim=1)
                correct += preds.eq(targets.expand_as(preds)).cpu().sum()
                total += len(targets)

            scheduler.step()
            train_acc = np.around(tensor2numpy(correct) * 100 / total, decimals=2)

            for i, (_, inputs, targets) in enumerate(self.forget_loader):
                targets = targets.type(torch.LongTensor)
                inputs, targets = inputs.to(self._device), targets.to(self._device)
            forget_acc, loss_forget = self._compute_accuracy(self._network, self.forget_loader)

            with tune.checkpoint_dir(epoch) as checkpoint_dir:
                path = os.path.join(checkpoint_dir, "checkpoint")
                torch.save((self._network.state_dict(), optimizer.state_dict()), path)

            tune.report(loss=loss_forget, accuracy=forget_acc)

            for i, (_, inputs, targets) in enumerate(self.test_loader):
                targets = targets.type(torch.LongTensor)
                inputs, targets = inputs.to(self._device), targets.to(self._device)
            test_acc, loss_test = self._compute_accuracy(self._network, self.test_loader)

            info = "Task {}, Epoch {}/{} => Loss {:.3f}, Train_accy {:.2f}, Test_accy {:.2f}".format(
                self._cur_task,
                epoch + 1,
                epochs,
                losses / len(self.train_loader),
                train_acc,
                test_acc,)
            prog_bar.set_description(info)

        logging.info(info)

    def compute_ewc(self):
        loss = 0
        if len(self._multiple_gpus) > 1:
            for n, p in self._network.module.named_parameters():
                if n in self.fisher.keys():
                    loss += (
                        torch.sum(
                            (self.fisher[n])
                            * (p[: len(self.mean[n])] - self.mean[n]).pow(2)
                        )
                        / 2
                    )
        else:
            for n, p in self._network.named_parameters():
                if n in self.fisher.keys():
                    loss += (
                        torch.sum(
                            (self.fisher[n])
                            * (p[: len(self.mean[n])] - self.mean[n]).pow(2)
                        )
                        / 2
                    )
        return loss

    def getFisherDiagonal(self):
        fisher = {
            n: torch.zeros(p.shape).to(self._device)
            for n, p in self._network.named_parameters()
            if p.requires_grad
        }
        self._network.train()
        optimizer = optim.SGD(self._network.parameters(), lr=lrate)
        for i, (_, inputs, targets) in enumerate(self.train_loader):
            targets = targets.type(torch.LongTensor)
            inputs, targets = inputs.to(self._device), targets.to(self._device)
            logits = self._network(inputs)["logits"]
            loss = torch.nn.functional.cross_entropy(logits, targets)
            optimizer.zero_grad()
            loss.backward()
            for n, p in self._network.named_parameters():
                if p.grad is not None:
                    fisher[n] += p.grad.pow(2).clone()
        for n, p in fisher.items():
            fisher[n] = p / len(self.train_loader)
            fisher[n] = torch.min(fisher[n], torch.tensor(fishermax))
        return fisher

First task is done when model.training() is called.
It completes the first task since there is no tuning (no ray tune) in that part. However, when it continue to next task I am getting these statement:

Started a local Ray instance.
And the error comes:

File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 573, in start_trial
return self._start_trial(trial)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 473, in _start_trial
runner = self._setup_remote_runner(trial)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\tune\execution\ray_trial_executor.py”, line 414, in _setup_remote_runner
return full_actor_class.remote(**kwargs)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\actor.py”, line 637, in remote
return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\util\tracing\tracing_helper.py”, line 387, in _invocation_actor_class_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray\actor.py”, line 844, in _remote
worker.function_actor_manager.export_actor_class(
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray_private\function_manager.py”, line 479, in export_actor_class
check_oversized_function(
File “C:\Users\20212002\Anaconda3\envs\daddy\lib\site-packages\ray_private\utils.py”, line 742, in check_oversized_function
raise ValueError(error)

ValueError: The actor ImplicitFunc is too large (151 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
This error is thrown after increasing my model size. Before increasing the model size. function size was under the defined threshold value (28MiB).

I am not sure where and how the use ray.put and ray.get should be used since the network is called many places in the code.

I would like to run experiments with bigger models and any help is appreciated.
Thanks

Hey @ElifCerenGok, I found someone encountered similar issues: python - The actor ImplicitFunc is too large error - Stack Overflow

I think the problem is, the model.update_representation implicitly captured your model object, which makes Ray trying to serialize and send a very large function.

I would suggest to move your update_representation out of your model class definition, and wrap your training logic in a single trainable function. Here tune.with_parameters does the ray.put and ray.get for you, so you don’t have to move model object to object store manually.

def update_representation(model, config):
    ...

model = init_model()

tune.run(
    tune.with_parameters(update_representation, model=model),
    ...
)

Another even better solution is to initialize your model inside the tuning function:

def update_representation(config):
    model = init_model()  # create a model object / load from checkpoint
    ...

tune.run(
    update_representation,
    resources_per_trial={"cpu": 12, "gpu": 1},
    config=config
    ...
)