1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.
2. Environment:
- Ray version: 2.48.0
- Python version: 3.11.11
- OS: Ubuntu 24.03.3 LTS
- Cloud/Infrastructure: Ubuntu Server
- Other libs/tools (if relevant): TimeMoe e Cronos
3. What happened vs. what you expected:
- Expected:
- I am migrating my pipeline to
Ray
. Before I usedjoblib Parallel
. - With
Parallel
I was able to run my code, however as processes starts independently, every foundation model (TimeMoe
andCronos
) had to be loaded on each processor usingParallel
. - With
Ray actors
I want to load these large models only once, and then pass then to every process. Every process will just run the prediction.
- I am migrating my pipeline to
- Wishes:
- I want to load both models in a single GPU and make the predictions with them. Each takes
1Gb
of memory and my GPU has 4 cards with 44Gb of memory each. - I want to be able to run in
local_mode = [True, False]
because sometimes I am going to need to debug.
- I want to load both models in a single GPU and make the predictions with them. Each takes
- Actual:
- I have the following error:
2025-08-19 18:18:06,363 ERROR serialization.py:533 -- Attempting to deserialize object on CUDA device 0 but torch.cuda.device_count() is 0. Please use torch.load with map_location to map your storages to an existing device.
- I tried to fix with torch.load() but I received another error from trorch ;
- I have the following error:
4. My pipeline code:
import torch
import os
import torch
from chronos import BaseChronosPipeline
from core_forecast.model_forecast import PreTrainedModels
import ray
from model_hyper_parameters import model_params
from event_based_multi_modelos import process_series
from transformers import AutoModelForCausalLM
os.environ["CUDA_VISIBLE_DEVICES"] = "3"
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available")
else:
print(f"CUDA version in Pytorch: {torch.version.cuda}")
device = "cuda"
# --- Model Actor for Chronos ---
@ray.remote(num_gpus=0.25)
class ChronosModelActor:
def __init__(self, model_path: str, device: str, torch_dtype):
self.device = device if torch.cuda.is_available() else "cpu"
self.model = BaseChronosPipeline.from_pretrained(model_path, device_map=self.device, torch_dtype=torch_dtype)
def predict(self, context_tensor, forecast_horizon: int):
context_tensor = context_tensor.to(self.device)
with torch.no_grad():
_, mean = self.model.predict_quantiles(context=context_tensor, prediction_length=forecast_horizon)
return mean.detach().cpu()
# --- Model Actor for TimeMoE ---
@ray.remote(num_gpus=0.25)
class TimeMoEModelActor:
def __init__(self, model_path: str, device: str, torch_dtype):
self.device = device if torch.cuda.is_available() else "cpu"
self.model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path=model_path,
device_map=self.device, ## MM, era "cpu"
trust_remote_code=True,
torch_dtype=torch_dtype,
)
self.model = self.model.to(self.device)
def predict(self, context_tensor, forecast_horizon: int):
context_tensor = context_tensor.to(self.device)
# You can adjust this method based on TimeMoE inference specifics
with torch.no_grad(): # aqui não calcula gradientes
output = self.model.generate(context_tensor, max_new_tokens=forecast_horizon)
forecast = output[:, -forecast_horizon:]
return forecast.cpu()
# --- Main Parallel Execution ---
@ray.remote
def parallel_process_series(
ray_actors, directories, path, yml_input, externas_categoricas, externas_numericas, scaler_y, kill_model_in_train, variaveis_usuario_nao_serao_usadas
):
# Dispatch parallel tasks
futures = []
for directory in directories:
futures.append(process_series(ray_actors, directory, path, yml_input, externas_categoricas, externas_numericas, scaler_y, kill_model_in_train, variaveis_usuario_nao_serao_usadas))
results = ray.get(futures)
return results
N_CORES = 10
ray.init(num_cpus=N_CORES, num_gpus=1, include_dashboard=False, local_mode=True)
# Load models as Ray Actors
chronos_actor = ChronosModelActor.remote(os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("BaseChronosPipeline")["model_name"][0]), device, torch.bfloat16)
timemoe_actor = TimeMoEModelActor.remote(os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("TimeMoE")["model_name"][0]), device, torch.bfloat16)
# Example usage:
results_ref = [parallel_process_series.remote(
ray_actors={"chronos": chronos_actor, "timemoe": timemoe_actor},
directories=directories,
path=path,
yml_input=yml_input,
externas_categoricas=VARIAVEIS_EXTERNAS_CATEGORICAS,
externas_numericas=VARIAVEIS_EXTERNAS_NUMERICAS,
scaler_y=SCALER_Y,
kill_model_in_train=KILL_MODEL_FIT_IN_TRAIN,
variaveis_usuario_nao_serao_usadas=VARIAVEIS_USUARIO_NAO_SERAO_USADAS
)]
# Esse método é fail-first. Ou seja, se um erro acontecer, ele para imediatamente. Para deixar ele rodando, tem que adaptar o código, não basta mudar um argumento. Então, deixei ele fixo.
ray.get(results_ref)
5. A piece of my process_series function
def process_series(
ray_actors,
d,
path,
yml_input,
externas_categoricas,
externas_numericas,
scaler_y,
kill_model_in_train,
variaveis_usuario_nao_serao_usadas,
):
# ...... code .... #
class PreTrainedModels:
# Reproducibility
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8"
os.environ["TF_CUDNN_DETERMINISTIC"] = "true" # I include for TimeMoe
os.environ["TF_DETERMINISTIC_OPS"] = "true" # I include for TimeMoe
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
os.environ["PYTHONHASHSEED"] = str(42)
# Fix seeds in torch
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
# transformers.set_seed(42, deterministic=True)
# torch_dtype = torch.float32 # More precision
torch_dtype = torch.bfloat16
chunk_size = 600 # How many IDs should I pass at once to any PreTrained model to make prediction
model = ray_actors.get("cronos")
_, mean_ref = ray.get(model.predict.remote(
context_tensor=context_tensor,
forecast_horizon=forecast_horizon,
))
mean = ray.get(mean_ref)
I tried to add:
`torch.load(os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("BaseChronosPipeline")["model_name"][0]), map_location=device)`
`torch.load(os.path.join(BASE_PATH_TRAINED_MODELS, model_params.get("TimeMoE")["model_name"][0]), map_location=device)`
But I got: IsADirectoryError: [Errno 21] Is a directory: '/mnt/data/gui/hub/models--amazon--chronos-bolt-base/snapshots/6f8ced46a499ae1dfd399981f551152d756cf4f6'
.
Any clue what may be missing in my code?