- Severity of the issue:
- Medium: Significantly affects my productivity but can find a workaround.
- Environment:
- Ray version: 2.44
- Python version: 3.11
My data processor class looks something like this:
class Processor:
def __init__(self, ...):
... # initialize resources
async def process(self, item):
while True:
pp_item = self.pre_process(item) # CPU-bound
result = await external_operation(pp_item) # may take a while, up to N times longer than pre/post-processing
item = self.post_process(result) # CPU-bound
if self.done(item):
break
return item
I need to transform a dataset using this class, utilizing all available CPU resources. The dataset is quite large, so I need to process it in a streaming fashion, without materializing all of it in memory.
My idea is to create NUM_CPUS
actors based on this class and submit N
data rows to each one concurrently.
However, I’m not sure how to do this using the Ray Data API. It seems like I could make Processor
a callable class and use it like this:
dataset.map(Processor, fn_constructor_args=..., concurrency=NUM_CPUS)
.
But how would I specify that each instance can handle up to N
concurrent requests?
Alternatively, I suppose I could create an ActorPool
, containing N
copies of each actor instance but then how would I integrate that with dataset.map()
?