How severe does this issue affect your experience of using Ray?
High: It blocks me from completing my task.
I am trying to process histograms of images, and I defined the following code :
import cv2
import numpy as np
import ray
ray.init()
@ray.remote
class ImageAnalyzer(object):
def __init__(self) -> None:
pass
def compute_histograms(self, image_path):
print(image_path)
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
luminance = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)[:, :, 0]
hist_lum,bins = np.histogram(luminance.ravel(),256,[0,255])
return hist_lum
# Load a list of 4500 images
image_list = glob.glob("my_dir/*.png", recursive=True)
image_analyzers = [ImageAnalyzer.remote() for _ in image_list]
results = ray.get([ia.compute_histograms.remote(img_path ) for ia, img_path in zip(image_analyzers, image_list )])
print(results)
The computer I use has 64 cores and 128Gb of RAM, and I would expect Ray to process 64 histograms at a time in 64 different processes. Instead, it seems Ray creates a worker for each image analyzer instance and a worker for each call of the compute_histogram function. Because I have 4500 images to process, the number of workers is too high, and the program stops.
Thank you for your reply @yic. Indeed, only using compute_histograms() as a remote function without an Actor works like a charm and is as fast as Dask.
However, the given class is a minimal example of my class. Therefore, having this class (or Actor) working would be great. My understanding is that I should use a single AsyncActor, give a maximum concurrency equal to my number of CPU cores and have it process the 4500 histograms. Is this the right direction?
@alixleroy I see. With your proposal it’ll work fine if it’s single node.
If you’d like to scale to more nodes, you’ll need either use the single Actor (async actor) as the launcher and run remote task inside the method or you’ll need to shard the traffic across actors.
@yic I tried the following code, resulting in a single node processing everything sequentially.
import cv2
import numpy as np
import ray
ray.init()
@ray.remote
class ImageAnalyzer(object):
def __init__(self) -> None:
pass
async def compute_histograms(self, image_path):
print(image_path)
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
luminance = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)[:, :, 0]
hist_lum,bins = np.histogram(luminance.ravel(),256,[0,255])
return hist_lum
# Load a list of 4500 images
image_list = glob.glob("my_dir/*.png", recursive=True)
image_analyzer = ImageAnalyzer.options(max_concurrency=64).remote()
results = ray.get([image_analyzer.compute_histograms.remote(img_path) for img_path in image_list])
print(results)
I am unsure of the difference between this solution and the first one you suggested in your last message. Any suggestions to make this code use multiple processes?
Regarding the shard of the traffic across actors, it looks pretty similar to creating a classical multiprocessing pool. I thought Ray was helping to avoid this.
Hey @alixleroy , I think the best approach here is to use Datasets. I would generally recommend using Datasets whenever you want to do bulk parallelization with Ray, since it’s a much higher level API. It also supports stateful setup such as using actors under the hood and managing a pool of those automatically.
Hi @ericl,
Many thanks for your reply.
It works very well this way. My code works perfectly fine when replacing __call__ by a normal function compute_histograms() and ImageAnalyzerCls by ImageAnalyzer.compute_histograms in the map function. This seems to offer more flexibility and keep the original class structure. Could this change impact the performances of Ray?
I find that my Ray code is 10-20% slower than my Dask code. I am currently using a single computer with 64 cores. Is there any reason behind this? I used the dask.delayed() function for this multi-processing task with Dask. Could the generated graph be more optimized than the future approach? Should I use Ray’s workflow to obtain similar results?
Interesting, if the task based approach is fast and the actor one slower, there could be two reasons: (1) could it be a actor launch overhead? This could be responsible if the difference is just a few seconds. (2) the configuration of the actor pool can matter, since unlike tasks you have to either specify a min/max size. Setting min=max=num_cpus would best emulate the pure task based version.
How many CPUs are you using? It could be the default parallelism limit of dataset.from_items (200). It could also be something else. One way to check if to use datasets without actors (using pure function in dataset.map()).
If you can provide a repro script (say with dummy computation function), that would be very helpful to diagnose.