Best Way to Pipeline Serve App

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I’m creating an app to process images through various models and trying to find the best approach to handle this using Ray. I’m looking for something with the best performance. The basic pipeline looks like:

  1. Read in image
  2. Preprocessing step (cpu)
  3. Run Model (gpu)
  4. Post Processing step (cpu)
  5. Write result

The options I see are:

What is the scale and SLA you are working with here?

10-50 frame videos with several hundred videos concurrently processing. Priority is processing speed however initial startup delays are ok. Second, I want to make sure the gpu is being fully saturated, reading and processing the inputs should be as fast as possible. And finally some fault tolerance would be ideal.

I have a fast working pipeline using Datasets but the map method can be somewhat limiting and maping multiple Classes adds a noticeable delay to each deployment call (not just an initial startup delay). I did a small test with just ray.remote functions which had comparable performance to the Dataset however it seems to run into more crashes (possibly an implementation issue on my end) which is problematic if a few frames don’t get processed in a video.

It seems like the new workflows is just ray.remote but with better fault tolerance?

@Akshay_Malik I put together this comparison. It seems like ray.remote is faster but possibly could have some issues with resources.

import ray
from ray import data
from ray.serve.handle import DeploymentHandle, DeploymentResponse
import numpy as np

import asyncio
from pathlib import Path
from PIL import Image
from time import perf_counter_ns

# Start Ray
ray.init()


# =========================== Example Model ===========================

from ray import serve
import torch
import torchvision.transforms.functional as F
import torchvision.models as models


@serve.deployment(
    logging_config=dict(log_level="ERROR"),
    ray_actor_options={"num_gpus": 1.0, "num_cpus": 1.0},
)
class ResnetModel:
    def __init__(self):
        self.weights = models.ViT_H_14_Weights.DEFAULT
        self.model = models.vit_h_14(weights="DEFAULT").cuda()
        self.model.eval()

        self.preprocess = self.weights.transforms()

    @torch.inference_mode()
    async def __call__(self, x):
        x = self.preprocess(F.to_tensor(x)).unsqueeze(0).cuda()
        return self.model(x).detach().cpu().numpy()


# Deploy the model
handle = serve.run(ResnetModel.bind())

# =========================== Dummy Data ===========================


def _cleanup(temp_dir):
    for f in temp_dir.iterdir():
        if f.is_file():
            f.unlink()

        if f.is_dir():
            _cleanup(f)
    temp_dir.rmdir()


dummy_data = np.random.rand(20, 3, 1024, 1024)

temp_dir = Path("temp")

if temp_dir.exists():
    _cleanup(temp_dir)

input_dir = temp_dir / "input"
input_dir.mkdir(parents=True)

output_dir = temp_dir / "output"
output_dir.mkdir(parents=True)

for i, x in enumerate(dummy_data):
    im = Image.fromarray((x * 255).astype(np.uint8).transpose(1, 2, 0))
    im.save(input_dir / f"{i}.png")

# =========================== Dataset ===========================


def _resnet2img(res):
    dim = int(np.sqrt(res.shape[-1]))

    res = res[:, : dim * dim].reshape(dim, dim)
    res = (res * 255).astype(np.uint8)
    res = np.stack([res] * 3, axis=-1)
    res = Image.fromarray(res[..., :3])
    return res


def _test_dataset(dir):

    def get_model_result(row, handle):
        x = row["image"]
        result = handle.remote(x).result()
        row["pred"] = result
        return row

    def preprocess_fn(row):
        row["image"] = row["image"] + 1
        return row

    def write_fn(row, dir):
        result = row["pred"]
        pred = _resnet2img(result)
        idx = Path(row["path"]).stem.split(".")[0]
        pred.save(dir / f"{idx}_pred.png")
        return row

    handle = serve.get_deployment_handle("ResnetModel", app_name="default")

    ds = data.read_images(dir, include_paths=True)

    ds = ds.map(preprocess_fn)

    ds = ds.map(get_model_result, fn_kwargs={"handle": handle})

    ds = ds.map(write_fn, fn_kwargs={"dir": output_dir})

    for row in ds.iter_rows():
        # x = row["image"]
        # print(f"Shape: {x.shape}, Min: {x.min()}, Max: {x.max()}")
        pass


start = perf_counter_ns()

for i in range(10):
    _test_dataset(input_dir)

print(f"Time: {(perf_counter_ns() - start) / 10 / 1e6:.2f}")

# =========================== Core ===========================


def _test_core(dir):

    @ray.remote
    def read_image(dir):
        img = Image.open(dir)
        return np.array(img)

    @ray.remote
    def get_model_result(x, handle):
        result = handle.remote(x).result()
        return result

    @ray.remote
    def preprocess(img):
        img = img + 1
        return img

    @ray.remote
    def write_img(result, dir, path):
        pred = _resnet2img(result)
        idx = Path(path).stem.split(".")[0]
        try:
            pred.save(dir / f"{idx}_pred.png")
            return True
        except Exception as e:
            print(e)
            return False

    images = dir.glob("*.png")

    handle = serve.get_deployment_handle("ResnetModel", app_name="default")

    results = []
    for img_path in images:
        read_res: ray.ObjectRef = read_image.remote(img_path)
        preprocess_res: ray.ObjectRef = preprocess.remote(read_res)
        model_res: ray.ObjectRef = get_model_result.remote(preprocess_res, handle)
        write_res: ray.ObjectRef = write_img.remote(model_res, output_dir, img_path)
        results.append(write_res)

    results = ray.get(results)

    for x in results:
        assert x
        # print(f"Shape: {x.shape}, Min: {x.min()}, Max: {x.max()}")
        pass


start = perf_counter_ns()

for i in range(10):
    _test_core(input_dir)

print(f"Time: {(perf_counter_ns() - start) / 10 / 1e6:.2f}")

# =========================== Cleanup ===========================

ray.shutdown()

_cleanup(temp_dir)

print("Done")