Backpressure with ActorPool (or alternatives?)

Hi,

Before going into details about what I want to achieve, one important remark: I am running ray on yarn and am only just getting into “ray core”, so I only have a fairly cursory overview over what ray provides and idioms. The way I start the cluster is with a couple of pre-allocated worker nodes and a head, all of which have set resource allocations. What I do not know yet is whether there was any functionality in ray that would allow it to dynamically allocate additional worker resources in the yarn cluster (where needed). So for the text below, I assume that I have to work with the resources I got and must not overshoot them (so as to keep my workers alive). Please let me know if there is any easy way of dynamically allocating additional resources on demand (other than scaling with skein explicitly).


I am trying to distribute a (batch) model inference workload that follows a fairly straightforward computation graph:

  1. start a ray cluster on yarn (I am not sure whether it is right now easy to dynamically scale the cluster from within ray to e.g. allocate additional resources when workers request too mich
  2. get data from a database (happens on the driver)
  3. have a couple of actors that preprocess individual observations (some models may be applied that are loaded in the actors` init)
  4. featurize preprocessed data (again using a couple of actors)
  5. run a torch model (again as actor)
  6. write results back to a database (remotely, without gathering results on the driver)

To control the number of individual actors, I thought about using the ray.utils.actor_pool.ActorPool and submitting tasks to the pool. Now given resource constraints I would like the driver to not overload the workers with batches of data. [This tutorial|https://docs.ray.io/en/master/ray-design-patterns/limit-tasks.html] provides an example that waits for result refs. However, I do not see a way to get to the result refs when using an ActorPool.

As an alternative, I thought about trying ray.util.Queue and defining actors which take input and output queues, consume new data from input queues and put new items to the output queues (both of which have maxsize=N set). This failed for me with the queues not blocking when an attempt is made to put data which would cause the maxsize to be exceeded and also resulted in multiple items being consumed by more than one worker. My understanding of the Queue was that it would handle concurrent access - is that wrong? If it should work, could someone provide me maybe with a minimal example where multiple actors consume information from a queue and write to another one?

Any hints or advice is much appreciated!

As a higher level alternative, you could use Datasets for batch inference. There’s an example at the bottom of the section here: Datasets: Distributed Arrow on Ray — Ray v2.0.0.dev0

If you need to do it incrementally, Dataset Pipelines can also do streaming batch inference: Dataset Pipelines — Ray v2.0.0.dev0

I think this would meet your requirements of having backpressure (which is managed internally by DatasetPipeline), and also not gathering results to driver (dataset can perform output operations in parallel with write_datasource(), or .map() with a custom write function).