Design choices of implementing an ensemble classifier

Hi,

I’m trying to implement an ensemble classifier, which consists of multiple component classifiers (call it “component” for brevity).

The training of the ensemble is decomposed into the training of individual components, which are independent to each other. Therefore, it is natural to parallelize the training of components.

What I did is below:

  • I implemented each component as a Ray actor, since it is stateful (e.g., the model parameters, etc).
  • In addition, assume each component can be trained using multiple cpus; I request the resource using@ray.remote(num_cpus={some number >1}).

The main dilemma is deadlocking happens when there are not enough cpus i.e., the total number of cpus used by Ray < the number of components x the number of cpus requested by each component.

An MWE is below:

import ray
import time

@ray.remote(num_cpus=2)  # say each component requires 2 cpus to train
class ComponentClassifier:
    def __init__(self, name):
        self.name = name

    def fit(self):
        """train this component clf"""
        time.sleep(2)
        return True



class EnsembleClassifier:
    def __init__(self, n_components=5):
        # initialize the actors
        self.base_clf_actors = [
            ComponentClassifier.remote(i) for i in range(n_components)
        ]

    def fit(self):
        """train the ensembel, which essentially train each component"""
        res_ids = [clf.fit.remote() for clf in self.base_clf_actors]
        return ray.get(res_ids)


ray.init(num_cpus=8)  # we requeste 8 cpus
clf = EnsembleClassifier(n_components=5)  # and use 5 components

# there are not enough cpus,
# because the total number of cpus < num. of components x num. of cpus per component
# i.e., 8 < 5 x 2
clf.fit()  # deadlock happens here

the output is something like:

(scheduler +8m33s) Warning: The following resource request cannot be scheduled right now: {'CPU': 2.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.
2021-11-13 04:51:58,228	WARNING worker.py:1228 -- The actor or task with ID ffffffffffffffff97b284facdd536588296cada01000000 cannot be scheduled right now. You can ignore this message if this Ray cluster is expected to auto-scale or if you specified a runtime_env for this actor or task, which may take time to install.  Otherwise, this is likely due to all cluster resources being claimed by actors. To resolve the issue, consider creating fewer actors or increasing the resources available to this Ray cluster.
Required resources for this actor or task: {CPU: 2.000000}
Available resources on this node: {0.000000/8.000000 CPU, 866940600.000000 GiB/866940600.000000 GiB memory, 433470300.000000 GiB/433470300.000000 GiB object_store_memory, 1.000000/1.000000 node:192.168.1.9}
In total there are 0 pending tasks and 1 pending actors on this node.

This is understandable, by design of how actor works (which occupies the requested resources unless it is dead).

A general question is:

  • how would you implement such functionality to avoid deadlocking?

A specific question is:

  • how to ask those finished actors (which finished executing fit) to release their resources, so that pending actors can run?

My final solution is this:

  • instead of using actors, use normal Python classes and Ray functions
  • each call of the Ray function creates a component classifier, fits it, and returns it
  • subsequent use of the classifiers (e.g., making predictions) depend on using the returned component classifiers.

A working MWE is below:

import ray
import time

class MyClass:
    def fit(self, value):
        time.sleep(1)
        print('done with fitting value {}'.format(value))
        return value

    def predict(self, value):
        time.sleep(1)
        print('done with prediction value {}'.format(value))
        return value

@ray.remote(num_cpus=2)
def _fit(value):
    obj = MyClass()
    obj.fit(value)
    return obj

ray.init(num_cpus=4)  # we request 4 cpus

obj_ids = [_fit.remote(v) for v in range(4)]

@ray.remote
def _predict(obj, value):
    return obj.predict(value)

res_ids = [_predict.remote(obj, value) for obj, value in zip(obj_ids, range(len(obj_ids)))]

ray.get(res_ids)

ray.shutdown()
2 Likes