Hi guys! One question regarding Actors and map_batches:
I saw on Actors — Ray 2.43.0 that we may use max_restarts
and max_task_retries
in a @ray.remote
decorator to allow an actor to be restarted on failures, but how can I pass these parameters when running map_batches
on a dataset pipeline?
Here is an example of an actor class and a call to map_batches
:
dataset.map_batches(ActorA, compute=ray.data.ActorPoolStrategy(1, 4), batch_size=100000)
class ActorA:
def __init__(self):
# init actor here
def __call__(self, batch):
return self.potential_exception_method(batch)
def potential_exception_method(self, batch):
# logic here