Too many workers

How severe does this issue affect your experience of using Ray?

  • High: It blocks me from completing my task.

I am trying to process histograms of images, and I defined the following code :

import cv2
import numpy as np
import ray

ray.init()

@ray.remote
class ImageAnalyzer(object):

    def __init__(self) -> None:
        pass

    def  compute_histograms(self, image_path):
        print(image_path)
        image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
            
        luminance = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)[:, :, 0]

        hist_lum,bins = np.histogram(luminance.ravel(),256,[0,255])
                
        return hist_lum


# Load a list  of 4500 images
image_list = glob.glob("my_dir/*.png", recursive=True)

image_analyzers = [ImageAnalyzer.remote() for _ in image_list]
results = ray.get([ia.compute_histograms.remote(img_path ) for ia, img_path in zip(image_analyzers, image_list )])
print(results)

The computer I use has 64 cores and 128Gb of RAM, and I would expect Ray to process 64 histograms at a time in 64 different processes. Instead, it seems Ray creates a worker for each image analyzer instance and a worker for each call of the compute_histogram function. Because I have 4500 images to process, the number of workers is too high, and the program stops.

Any ideas?

@alixleroy The reason is that actor’s call is queued and executed one by one. See this thread for more details.

But in your case, I don’t think we need an actor here. How about just making compute_histograms a remote function?

Thank you for your reply @yic. Indeed, only using compute_histograms() as a remote function without an Actor works like a charm and is as fast as Dask.

However, the given class is a minimal example of my class. Therefore, having this class (or Actor) working would be great. My understanding is that I should use a single AsyncActor, give a maximum concurrency equal to my number of CPU cores and have it process the 4500 histograms. Is this the right direction?

@alixleroy I see. With your proposal it’ll work fine if it’s single node.

If you’d like to scale to more nodes, you’ll need either use the single Actor (async actor) as the launcher and run remote task inside the method or you’ll need to shard the traffic across actors.

@yic I tried the following code, resulting in a single node processing everything sequentially.

import cv2
import numpy as np
import ray

ray.init()

@ray.remote
class ImageAnalyzer(object):

    def __init__(self) -> None:
        pass

    async def  compute_histograms(self, image_path):
        print(image_path)
        image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
            
        luminance = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)[:, :, 0]

        hist_lum,bins = np.histogram(luminance.ravel(),256,[0,255])
                
        return hist_lum


# Load a list  of 4500 images
image_list = glob.glob("my_dir/*.png", recursive=True)

image_analyzer = ImageAnalyzer.options(max_concurrency=64).remote()
results = ray.get([image_analyzer.compute_histograms.remote(img_path) for img_path in image_list])
print(results)

I am unsure of the difference between this solution and the first one you suggested in your last message. Any suggestions to make this code use multiple processes?

Regarding the shard of the traffic across actors, it looks pretty similar to creating a classical multiprocessing pool. I thought Ray was helping to avoid this.

Hey @alixleroy , I think the best approach here is to use Datasets. I would generally recommend using Datasets whenever you want to do bulk parallelization with Ray, since it’s a much higher level API. It also supports stateful setup such as using actors under the hood and managing a pool of those automatically.

Here’s an adapted examples from Transforming Datasets — Ray 1.13.0

from ray.data import ActorPoolStrategy

class ImageAnalyzerCls:
    def __call__(self, img_path):
        ...

min_workers = 3
max_workers = 10
image_list = ray.data.from_items(glob.glob("my_dir/*.png", recursive=True))
result = image_list.map(
     ImageAnalyzerCls,
     compute=ActorPoolStrategy(min_workers, max_workers))
print(result.take_all())

Ray can also scale a multiprocessing pool to multiple nodes with Distributed multiprocessing.Pool — Ray 1.13.0

https://docs.ray.io/en/latest/data/transforming-datasets.html#compute-strategy

Hi @ericl,
Many thanks for your reply.
It works very well this way. My code works perfectly fine when replacing __call__ by a normal function compute_histograms() and ImageAnalyzerCls by ImageAnalyzer.compute_histograms in the map function. This seems to offer more flexibility and keep the original class structure. Could this change impact the performances of Ray?

I find that my Ray code is 10-20% slower than my Dask code. I am currently using a single computer with 64 cores. Is there any reason behind this? I used the dask.delayed() function for this multi-processing task with Dask. Could the generated graph be more optimized than the future approach? Should I use Ray’s workflow to obtain similar results?

Interesting, if the task based approach is fast and the actor one slower, there could be two reasons: (1) could it be a actor launch overhead? This could be responsible if the difference is just a few seconds. (2) the configuration of the actor pool can matter, since unlike tasks you have to either specify a min/max size. Setting min=max=num_cpus would best emulate the pure task based version.

@ericl I compared the time of processing with the following :

  • Dash delayed function (Using Loky reusable executor)
  • Ray Dataset (with min-max workers = num CPUs)
  • Ray raw remote function

I obtained the following results for 12441 histograms on large images :

  • Dask delayed function : 188 sec
  • Ray Dataset : 226sec
  • Ray raw remote function : 142 sec

This is a significant difference between all these different approaches.

How many CPUs are you using? It could be the default parallelism limit of dataset.from_items (200). It could also be something else. One way to check if to use datasets without actors (using pure function in dataset.map()).

If you can provide a repro script (say with dummy computation function), that would be very helpful to diagnose.

For your original question, my proposal is like:

@ray.remote
def compute_histograms_helper(...):
   ....
@ray.remote
class ImageAnalyzer:
    ....
    async def  compute_histograms(self, image_path):
        return await compute_histograms_helper.remote()

But as Eric mentioned, these are low-level ray core libraries, libraries like datasets should simplify the work a lot.