How do you parameterize Actors in Actor Pools?

I have an actor that multiplies every number in a list by a constant factor m.

class MultiplyActor:
    def __init__(self, m):
        self.m = m

    def __call__(self, x):
        return [y * self.m for y in x]

I want to parameterize the actor by passing the value of m to its constructor.

I also want to use this actor in a Dataset.map_batches operation with the actor pool compute strategy. However, Dataset.map_batches requires that you pass it a class that takes no constructor arguments.

(This is a simplified example. In a real application the initialization of my actor would be an expensive step whose result I’d want to cache.)

I made this work by passing Dataset.map_batches a closure like so.

import ray
from ray.data import ActorPoolStrategy


class MultiplyActor:
    def __init__(self, m):
        self.m = m

    def __call__(self, x):
        return [y * self.m for y in x]


def main(m: int):
    def closure(x):
        return MultiplyActor(m)(x)

    data_set = ray.data.range(5)
    data_set = data_set.map_batches(closure, compute=ActorPoolStrategy())
    data_set.show()


if __name__ == "__main__":
    main(3)

Is the recommended way to parameterize actor pool actors in Ray?

Now that I’ve got that working, I try to add another actor before my multiplication. To keep it simple, just make it an identity actor.

import ray
from ray.data import ActorPoolStrategy


class Identity:
    def __call__(self, x):
        return x


class MultiplyActor:
    def __init__(self, m):
        self.m = m

    def __call__(self, x):
        return [y * self.m for y in x]


def main(m: int):
    def closure(x):
        return MultiplyActor(m)(x)

    data_set = ray.data.range(5)
    data_set = data_set.map_batches(Identity, compute=ActorPoolStrategy())
    data_set = data_set.map_batches(closure, compute=ActorPoolStrategy())
    data_set.show()


if __name__ == "__main__":
    main(3)

I expect this to return the same results as without the Identity actor. Instead I see this error.

...
  File "/usr/local/anaconda3/envs/lingua/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 102, in fn
    yield from process_next_batch(batch)
  File "/usr/local/anaconda3/envs/lingua/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 66, in process_next_batch
    batch = batch_fn(batch, *fn_args, **fn_kwargs)
TypeError: Identity() takes no arguments

When I add the following constructor

class Identity:
    def __init__(self, *args, **kwargs):
        print(f"args={args}, kwargs={kwargs}")

I see this printed

(_MapWorker pid=47171) args=([0],), kwargs={}
(_MapWorker pid=47171) args=([1],), kwargs={}
(_MapWorker pid=47174) args=([0],), kwargs={}
(_MapWorker pid=47174) args=([1],), kwargs={}

This gives the error

ValueError: The `fn` you passed to `map_batches` returned a value of type <class '__main__.Identity'>. This isn't allowed -- `map_batches` expects `fn` to return a `pandas.DataFrame`, `pyarrow.Table`, `numpy.ndarray`, `list`, or `dict[str, numpy.ndarray]`.

So the Identity.__call__ function is getting confused with with Identity.__init__ function.

Why would placing an actor pool after the first actor pool change the first actor pool’s behavior, and why would the constructor get confused with the default method?

Everything works fine if you put the identity actor after the multiplication actor.

...
    data_set = data_set.map_batches(closure, compute=ActorPoolStrategy())
    data_set = data_set.map_batches(Identity, compute=ActorPoolStrategy())
...

Is this a bug? Is there a way to put a actor ahead of a parameterized actor closure? Is there some other way to implement this that I’m not seeing?

Hi, I think you can pass an object to Dataset.map_batches instead of the class.
For instance:

class MultiplyActor:
    def __init__(self, m):
        self.m = m

    def __call__(self, x):
        return [y * self.m for y in x]


data_set = ray.data.range(5)
mul_actor = MultiplyActor(5)
data_set = data_set.map_batches(mul_actor, fn_args=([1,2,3]))
data_set.show()

Getting closer. :grinning:. I hadn’t read through the parameter documentation for map_batches thoroughly enough.

Your code gives me the following error:

...
 File "/usr/local/anaconda3/envs/lingua/lib/python3.9/site-packages/ray/data/_internal/plan.py", line 905, in fuse
    fn_args, unpack_args = _pack_args(
  File "/usr/local/anaconda3/envs/lingua/lib/python3.9/site-packages/ray/data/_internal/plan.py", line 805, in _pack_args
    self_fn_args
TypeError: can only concatenate list (not "tuple") to list

However, the following code works.

import ray
from ray.data import ActorPoolStrategy

class MultiplyActor:
    def __init__(self, m):
        self.m = m

    def __call__(self, x):
        return [y * self.m for y in x]

data_set = ray.data.range(5)
data_set = data_set.map_batches(
    MultiplyActor, fn_constructor_args=(3,), compute=ActorPoolStrategy()
)
data_set.show()

Now that I think about it, this makes sense. Of course passing arguments to the constructor via a function closure wouldn’t work: you can’t distribute a function closure over the wire.

Thanks.

@wpm Good self sleuthing!