I am trying tensor parallel with deepspeed on ray. I constructed class DeepspeedTPWorker
to handle one part of pre-trained model and class DeepspeedPredictor
to manage a group of workers who load one complete model in tensor parallelism. Then I use ActorPool
to manage some DeepspeedPredictor
actors.
I ran my script as follow on a machine with 16 cores and 2 A40s with 46G memory each.
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.16.4
# kernelspec:
# display_name: .venv
# language: python
# name: python3
# ---
# +
import ray
ray.init()
ray.available_resources()
# -
import torch
model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
# +
from transformers import AutoTokenizer
class tokenize:
def __init__(self, model_name=model_name):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.tokenizer.pad_token = self.tokenizer.eos_token
def __call__(self, text):
batch_tokens = self.tokenizer(
text["begginnings"].tolist(),
return_tensors="pt",
max_length=128,
padding="max_length",
truncation=True,
)
return dict(batch_tokens)
# +
import pandas as pd
from ray import data
begginnings_all = [
"The King is dead. Long live the Queen.",
"Once there were four children whose names were Peter, Susan, Edmund, and Lucy.",
"The story so far: in the beginning, the universe was created.",
"It was a bright cold day in April, and the clocks were striking thirteen.",
"It is a truth universally acknowledged, that a single man in possession of a good fortune, must be in want of a wife.",
"The sweat wis lashing oafay Sick Boy; he wis trembling.",
"124 was spiteful. Full of Baby's venom.",
"As Gregor Samsa awoke one morning from uneasy dreams he found himself transformed in his bed into a gigantic insect.",
"I write this sitting in the kitchen sink.",
"We were somewhere around Barstow on the edge of the desert when the drugs began to take hold.",
] * 10
dataset = data.from_pandas(pd.DataFrame({"begginnings": begginnings_all}))
ray_dataset = dataset.map_batches(tokenize, concurrency=2, batch_size=10)
print(ray_dataset)
# -
from contextlib import closing
import socket
# +
import ray.data
from transformers import AutoModelForCausalLM
import torch
import torch.distributed as dist
from typing import Dict, Any
import os
import pandas as pd
@ray.remote(num_cpus=2, num_gpus=1)
class DeepspeedTPWorker:
def __init__(
self,
model_rank: int,
local_rank: int,
local_world_size: int,
ip_address :str,
port: int,
model ,
ds_config=None,
dtype=torch.float16,
):
print(f"is cuda available: {torch.cuda.is_available()}")
self.ds_config = ds_config
self.model_rank = model_rank
self.local_rank = local_rank
self.local_world_size = local_world_size
self.ip_address = ip_address
self.port = port
self.model = model
self.ds_config = ds_config
self.dtype = dtype
print(
f"rank {self.local_rank} assigned {torch.cuda.device_count()} CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES')}"
)
dist.init_process_group(
backend="nccl",
init_method=f"tcp://{self.ip_address}:{self.port}",
rank=self.local_rank,
world_size=self.local_world_size,
)
def load_ds_engine(self):
import deepspeed
deepspeed.init_distributed("nccl")
self.model = deepspeed.init_inference(self.model, self.ds_config)
# with open(f"layers-{self.local_rank}","w") as f:
# for name, param in self.model.named_parameters():
# f.write(f"Rank: {self.local_rank}, Parameter: {name}, Mean: {param.mean().item()}\n")
def _predict_pandas(self, batch: Dict[str, Any]) -> pd.DataFrame:
with torch.no_grad():
out_tokens = self.model.generate(**batch)
out_tokens = out_tokens.to("cpu")
return pd.DataFrame(
{
"generated_tokens": out_tokens,
"decoded_texts": self.tokenizer.batch_decode(
out_tokens, skip_special_tokens=True
),
}
)
# +
@ray.remote(num_cpus=2)
class DeepspeedPredictor:
def __init__(self, model, num_workers_per_model, model_rank) -> None:
self.model = model
self.num_workers_per_model = num_workers_per_model
self.ds_config = {
"tensor_parallel": {
"enabled": True,
"tp_size": num_workers_per_model,
},
}
self.comm_port = self._find_free_port()
self.comm_master_ip = ray.util.get_node_ip_address()
self.workers = [
DeepspeedTPWorker.remote(model_rank, i, self.num_workers_per_model, self.comm_master_ip, self.comm_port, model_ref, self.ds_config)
for i in range(self.num_workers_per_model)
]
self.load_model()
def load_model(self):
ray.get([p.load_ds_engine.remote() for p in self.workers])
def _find_free_port(self) -> int:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
def predict(self, batch: Dict[str, Any]) -> pd.DataFrame:
predictions = ray.get([worker._predict_pandas.remote(batch) for worker in self.workers])
combined_output = pd.concat(predictions, ignore_index=True)
return combined_output
# +
model = AutoModelForCausalLM.from_pretrained(model_name)
model_ref = ray.put(model)
# +
from ray.util.actor_pool import ActorPool
num_parallel_models = 1
num_workers_per_model = 2
predictors = [DeepspeedPredictor.remote(model_ref, num_workers_per_model, i) for i in range(num_parallel_models)]
pool = ActorPool(predictors)
for it in ray_dataset.iter_torch_batches():
pool.submit(lambda a, v: a.predict.remote(v), it)
while pool.has_next():
print(pool.get_next())
# -
ray.shutdown()
Then I met:
2024-09-26 11:17:16,721 INFO worker.py:1598 -- Connecting to existing Ray cluster at address: 10.0.8.98:6379...
2024-09-26 11:17:16,733 INFO worker.py:1774 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
MapBatches(tokenize)
+- Dataset(num_rows=100, schema={begginnings: object})
2024-09-26 11:18:27,225 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-09-26_07-38-13_206865_51653/logs/ray-data
2024-09-26 11:18:27,225 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(tokenize)]
✔️ Dataset execution finished in 13.34 seconds: : 100 row [00:13, 7.50 row/s] ]
2024-09-26 11:18:40,567 WARNING actor_pool_map_operator.py:265 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
- MapBatches(tokenize): 0 active, 0 queued, [cpu: 1.0, objects: 0.0B], 0 actors [locality off]: : 100 row [00:00, 763 row/s]
(DeepspeedTPWorker pid=268595) is cuda available: True
(DeepspeedTPWorker pid=268595) rank 0 assigned 1 CUDA_VISIBLE_DEVICES: 0
(DeepspeedTPWorker pid=268595) [2024-09-26 11:18:53,653] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
(DeepspeedTPWorker pid=268595) [2024-09-26 11:18:55,169] [INFO] [comm.py:637:init_distributed] cdb=None
(DeepspeedTPWorker pid=268595) [2024-09-26 11:18:55,169] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed info: version=0.14.5, git-hash=unknown, git-branch=unknown
(DeepspeedTPWorker pid=268595) [2024-09-26 11:18:55,171] [INFO] [logging.py:96:log_dist] [Rank 0] quantize_bits = 8 mlp_extra_grouping = False, quantize_groups = 1
(DeepspeedTPWorker pid=268595) AutoTP: [(<class 'transformers.models.llama.modeling_llama.LlamaDecoderLayer'>, ['self_attn.o_proj', 'mlp.down_proj'])]
Traceback (most recent call last):
File "/home/ubuntu/oms/tutorials/notebooks/inference_NxN_TPDP_ray_deepspeed.py", line 222, in <module>
print("Prediction output size:", pool.get_next())
File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/actor_pool.py", line 309, in get_next
return ray.get(future)
File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/worker.py", line 2661, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/worker.py", line 871, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RaySystemError): ray::DeepspeedPredictor.predict() (pid=268209, ip=10.0.8.98, actor_id=42678021ed8f98f0802215e423000000, repr=<inference_NxN_TPDP_ray_deepspeed.DeepspeedPredictor object at 0x7fb2c4350dc0>)
At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
traceback: Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/storage.py", line 381, in _load_from_bytes
return torch.load(io.BytesIO(b))
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1040, in load
return _legacy_load(opened_file, map_location, pickle_module, **pickle_load_args)
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1272, in _legacy_load
result = unpickler.load()
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1205, in persistent_load
obj = restore_location(obj, location)
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 390, in default_restore_location
result = fn(storage, location)
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 265, in _cuda_deserialize
device = validate_cuda_device(location)
File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 249, in validate_cuda_device
raise RuntimeError('Attempting to deserialize object on a CUDA '
RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
(DeepspeedPredictor pid=268209) Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
(DeepspeedPredictor pid=268209) Traceback (most recent call last):
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/serialization.py", line 423, in deserialize_objects
(DeepspeedPredictor pid=268209) obj = self._deserialize_object(data, metadata, object_ref)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/serialization.py", line 280, in _deserialize_object
(DeepspeedPredictor pid=268209) return self._deserialize_msgpack_data(data, metadata_fields)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/serialization.py", line 235, in _deserialize_msgpack_data
(DeepspeedPredictor pid=268209) python_objects = self._deserialize_pickle5_data(pickle5_data)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/serialization.py", line 225, in _deserialize_pickle5_data
(DeepspeedPredictor pid=268209) obj = pickle.loads(in_band)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/storage.py", line 381, in _load_from_bytes
(DeepspeedPredictor pid=268209) return torch.load(io.BytesIO(b))
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1040, in load
(DeepspeedPredictor pid=268209) return _legacy_load(opened_file, map_location, pickle_module, **pickle_load_args)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1272, in _legacy_load
(DeepspeedPredictor pid=268209) result = unpickler.load()
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 1205, in persistent_load
(DeepspeedPredictor pid=268209) obj = restore_location(obj, location)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 390, in default_restore_location
(DeepspeedPredictor pid=268209) result = fn(storage, location)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 265, in _cuda_deserialize
(DeepspeedPredictor pid=268209) device = validate_cuda_device(location)
(DeepspeedPredictor pid=268209) File "/home/ubuntu/.local/lib/python3.10/site-packages/torch/serialization.py", line 249, in validate_cuda_device
(DeepspeedPredictor pid=268209) raise RuntimeError('Attempting to deserialize object on a CUDA '
(DeepspeedPredictor pid=268209) RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
(DeepspeedTPWorker pid=268596) is cuda available: True
(DeepspeedTPWorker pid=268596) rank 1 assigned 1 CUDA_VISIBLE_DEVICES: 1
(DeepspeedTPWorker pid=268596) [2024-09-26 11:18:53,656] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
(DeepspeedTPWorker pid=268596) [2024-09-26 11:18:55,198] [INFO] [comm.py:637:init_distributed] cdb=None
(DeepspeedTPWorker pid=268596) AutoTP: [(<class 'transformers.models.llama.modeling_llama.LlamaDecoderLayer'>, ['self_attn.o_proj', 'mlp.down_proj'])]
It seems that actor can’t find cuda when DeepspeedPredictor.predict
fetch returns from DeepspeedTPWorker._predict_pandas
. But I checked torch.cuda.is_available
in DeepspeedTPWorker in my code and it returned True. I wander know what happen when running ray.get()
in DeepspeedPredictor.predict