How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I’m creating an app to process images through various models and trying to find the best approach to handle this using Ray. I’m looking for something with the best performance. The basic pipeline looks like:
- Read in image
- Preprocessing step (cpu)
- Run Model (gpu)
- Post Processing step (cpu)
- Write result
The options I see are:
What is the scale and SLA you are working with here?
10-50 frame videos with several hundred videos concurrently processing. Priority is processing speed however initial startup delays are ok. Second, I want to make sure the gpu is being fully saturated, reading and processing the inputs should be as fast as possible. And finally some fault tolerance would be ideal.
I have a fast working pipeline using Datasets but the map
method can be somewhat limiting and map
ing multiple Classes adds a noticeable delay to each deployment
call (not just an initial startup delay). I did a small test with just ray.remote
functions which had comparable performance to the Dataset however it seems to run into more crashes (possibly an implementation issue on my end) which is problematic if a few frames don’t get processed in a video.
It seems like the new workflows is just ray.remote
but with better fault tolerance?
@Akshay_Malik I put together this comparison. It seems like ray.remote
is faster but possibly could have some issues with resources.
import ray
from ray import data
from ray.serve.handle import DeploymentHandle, DeploymentResponse
import numpy as np
import asyncio
from pathlib import Path
from PIL import Image
from time import perf_counter_ns
# Start Ray
ray.init()
# =========================== Example Model ===========================
from ray import serve
import torch
import torchvision.transforms.functional as F
import torchvision.models as models
@serve.deployment(
logging_config=dict(log_level="ERROR"),
ray_actor_options={"num_gpus": 1.0, "num_cpus": 1.0},
)
class ResnetModel:
def __init__(self):
self.weights = models.ViT_H_14_Weights.DEFAULT
self.model = models.vit_h_14(weights="DEFAULT").cuda()
self.model.eval()
self.preprocess = self.weights.transforms()
@torch.inference_mode()
async def __call__(self, x):
x = self.preprocess(F.to_tensor(x)).unsqueeze(0).cuda()
return self.model(x).detach().cpu().numpy()
# Deploy the model
handle = serve.run(ResnetModel.bind())
# =========================== Dummy Data ===========================
def _cleanup(temp_dir):
for f in temp_dir.iterdir():
if f.is_file():
f.unlink()
if f.is_dir():
_cleanup(f)
temp_dir.rmdir()
dummy_data = np.random.rand(20, 3, 1024, 1024)
temp_dir = Path("temp")
if temp_dir.exists():
_cleanup(temp_dir)
input_dir = temp_dir / "input"
input_dir.mkdir(parents=True)
output_dir = temp_dir / "output"
output_dir.mkdir(parents=True)
for i, x in enumerate(dummy_data):
im = Image.fromarray((x * 255).astype(np.uint8).transpose(1, 2, 0))
im.save(input_dir / f"{i}.png")
# =========================== Dataset ===========================
def _resnet2img(res):
dim = int(np.sqrt(res.shape[-1]))
res = res[:, : dim * dim].reshape(dim, dim)
res = (res * 255).astype(np.uint8)
res = np.stack([res] * 3, axis=-1)
res = Image.fromarray(res[..., :3])
return res
def _test_dataset(dir):
def get_model_result(row, handle):
x = row["image"]
result = handle.remote(x).result()
row["pred"] = result
return row
def preprocess_fn(row):
row["image"] = row["image"] + 1
return row
def write_fn(row, dir):
result = row["pred"]
pred = _resnet2img(result)
idx = Path(row["path"]).stem.split(".")[0]
pred.save(dir / f"{idx}_pred.png")
return row
handle = serve.get_deployment_handle("ResnetModel", app_name="default")
ds = data.read_images(dir, include_paths=True)
ds = ds.map(preprocess_fn)
ds = ds.map(get_model_result, fn_kwargs={"handle": handle})
ds = ds.map(write_fn, fn_kwargs={"dir": output_dir})
for row in ds.iter_rows():
# x = row["image"]
# print(f"Shape: {x.shape}, Min: {x.min()}, Max: {x.max()}")
pass
start = perf_counter_ns()
for i in range(10):
_test_dataset(input_dir)
print(f"Time: {(perf_counter_ns() - start) / 10 / 1e6:.2f}")
# =========================== Core ===========================
def _test_core(dir):
@ray.remote
def read_image(dir):
img = Image.open(dir)
return np.array(img)
@ray.remote
def get_model_result(x, handle):
result = handle.remote(x).result()
return result
@ray.remote
def preprocess(img):
img = img + 1
return img
@ray.remote
def write_img(result, dir, path):
pred = _resnet2img(result)
idx = Path(path).stem.split(".")[0]
try:
pred.save(dir / f"{idx}_pred.png")
return True
except Exception as e:
print(e)
return False
images = dir.glob("*.png")
handle = serve.get_deployment_handle("ResnetModel", app_name="default")
results = []
for img_path in images:
read_res: ray.ObjectRef = read_image.remote(img_path)
preprocess_res: ray.ObjectRef = preprocess.remote(read_res)
model_res: ray.ObjectRef = get_model_result.remote(preprocess_res, handle)
write_res: ray.ObjectRef = write_img.remote(model_res, output_dir, img_path)
results.append(write_res)
results = ray.get(results)
for x in results:
assert x
# print(f"Shape: {x.shape}, Min: {x.min()}, Max: {x.max()}")
pass
start = perf_counter_ns()
for i in range(10):
_test_core(input_dir)
print(f"Time: {(perf_counter_ns() - start) / 10 / 1e6:.2f}")
# =========================== Cleanup ===========================
ray.shutdown()
_cleanup(temp_dir)
print("Done")