1. Severity of the issue: (select one)
High: Completely blocks me of using RAY.
2. Environment:
- Ray version: 2.48.0
- Python version: 3.11.11
- OS: Ubuntu 24.04.3 LTS with 127 cores
- GPU Infrastructure:
- 1 GPU of NVIDIA-SMI with 44Gb of memory
- Driver Version: 535.230.02
- CUDA Version: 12.2
Hi there!
I have a time series pipeline that run 15 different models for each target using Parallel from joblib
. 2 out of of 15 models were pre-trained foundation models (1GB each). My pipeline is able to run only 26 targets in parallel, because loading 26 foundation models would take all of the GPU Memory (44GB), I am also accounting the memory in prediction. My bottleneck is that I have other 101 CPU cores that remain idle.
So, I decided to use Ray Actors. This way, I could load each model only once and use all of my 127 cores. However, I did not have the time gain that I expected: Increasing in 4.9 times the number of cores decreased the running time in 2.5 times (From 13343 seconds to 5287 seconds). I expected that it would speedup my code in at least 4.5 times, taking a total time close to 3000 seconds.
I tried to increase the number of actors
loaded simultaneously, and adjust the num_cpus
and num_gpus
using either round robbing
strategy of ray ActorPool
. Increasing the number of actors decreased the performance. Changing the num_cpus
and num_gpus
did not impact much as long as it respected the overall capacity of my machine.
I also noted that ray decreased the time of the others 13 models that rely on CPU (ex.: KNN) or that are light to fit on GPU (ex.: GPU). Besides that, running the experiment with 127 cores, they were all 100% occupied and the GPU had a maximum of 10% GPU usage and 5% of memory.
This is how I load the foundation models and create the actors:
num_cpus_actor = 0.006
num_gpus_actor = 0.006
@ray.remote(num_gpus=num_gpus_actor, num_cpus=num_cpus_actor)
class ChronosModelActor:
def __init__(self, model_path: str, device: str, torch_dtype):
self.device = device if torch.cuda.is_available() else "cpu"
self.model = BaseChronosPipeline.from_pretrained(model_path, device_map=self.device, torch_dtype=torch_dtype)
def predict(self, context_tensor, forecast_horizon: int):
context_tensor = context_tensor.to(self.device)
_, mean = self.model.predict_quantiles(context=context_tensor, prediction_length=forecast_horizon)
return mean.detach().cpu()
@ray.remote(num_gpus=num_gpus_actor, num_cpus=num_cpus_actor)
class TimeMoEModelActor:
def __init__(self, model_path: str, device: str, torch_dtype):
self.device = device if torch.cuda.is_available() else "cpu"
self.model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path=model_path,
device_map=self.device,
trust_remote_code=True,
torch_dtype=torch_dtype,
)
self.model = self.model.to(self.device)
def predict(self, context_tensor, forecast_horizon: int):
context_tensor = context_tensor.to(self.device)
output = self.model.generate(context_tensor, max_new_tokens=forecast_horizon)
forecast = output[:, -forecast_horizon:]
return forecast.cpu()
# Over all setup of process
N_CORES = 100
ray.init(num_cpus=N_CORES, num_gpus=1, include_dashboard=True)
path_chronos = os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("BaseChronosPipeline")["model_name"][0])
path_timemoe = os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("TimeMoE")["model_name"][0])
chronos_actors = ChronosModelActor.remote(path_chronos, device, torch.bfloat16)
timemoe_actors = TimeMoEModelActor.remote(path_timemoe, device, torch.bfloat16)
And then, I pass then to a task function to run in parallel for each target:
@ray.remote(num_cpus=0.9, num_gpus=0)
@ray_debug
def process_series(
ray_actors,
d,
...
):
# Similar process to TimeMoe(other foundation model)
model = ray_actors.get("chronos")
# Make the predictions for all ids at once
mean = ray.get(model.predict.remote(
context_tensor=context_tensor,
forecast_horizon=forecast_horizon,
))
# this function does not return because I save the intermediate results into disk.
futures = []
for target in targets:
futures.append(
process_series.remote(
ray_actors={"chronos": chronos_actors,
"timemoe": timemoe_actors},
target=target,
...
)
)
results = ray.get(futures)
Questions
- Each target will be allocated to a specific core?
- Does passing
Actors
intotask function
can cause the overhead? - How can I optimize it?
I saw that there is a project to decrease the overhead per actor that may be related to my task.
My studies:
I had a good overview on how the math allocation of gpu works from my previous question. I went through Running methods with actors is slower than running normal methods, Ray Actors crash course, Actors — Ray 2.49.1 and GitHub · Where software is built among others, but I could not find a use case similar to mine.