Async and dataset transformation

  1. Severity of the issue:
  • Medium: Significantly affects my productivity but can find a workaround.
  1. Environment:
  • Ray version: 2.44
  • Python version: 3.11

My data processor class looks something like this:

class Processor:
    def __init__(self, ...):
        ... # initialize resources

    async def process(self, item):
        while True:
            pp_item = self.pre_process(item)  # CPU-bound
            result = await external_operation(pp_item)  # may take a while, up to N times longer than pre/post-processing
            item = self.post_process(result)  # CPU-bound
            if self.done(item):
                break
        return item

I need to transform a dataset using this class, utilizing all available CPU resources. The dataset is quite large, so I need to process it in a streaming fashion, without materializing all of it in memory.

My idea is to create NUM_CPUS actors based on this class and submit N data rows to each one concurrently.

However, I’m not sure how to do this using the Ray Data API. It seems like I could make Processor a callable class and use it like this:
dataset.map(Processor, fn_constructor_args=..., concurrency=NUM_CPUS).

But how would I specify that each instance can handle up to N concurrent requests?

Alternatively, I suppose I could create an ActorPool, containing N copies of each actor instance but then how would I integrate that with dataset.map()?

It seems to me this can be achieved using map_batches functionality.

You can specify number of concurrent actors through concurrency argument and then control number of rows for each invocation by using batch_size . Do you think something like this would work?

map_batches(
batch_size=N, // N rows per each request
concurrency=NUM_CPUS, // N concurrent workers
num_cpus=1 // 1 cpu per worker
)

That’s what I ended up doing eventually, but this adds quite a bit of boilerplate code to manage the actor pool, as well as scattering/gathering data rows over that pool.
I was wondering if I am missing some built-in option.

Ah, forgot to mention - I ended up with an explicit actor pool, because dataset.map_batches didn’t seem to want to spawn the requested number of workers. I’d ask for concurrency=64, but it would only launch 7 or 8. ¯\(ツ)

Do you have any console logs that you captured? Typically there is a progress bar in console that shows if the operator is backpressure by next stage (not consuming fast enough). In those cases, Raydata will scale down the actors.

if you set concurrency=64 for an actor-based map op, the data pipeline won’t start until all 64 actors have started up.