1. Severity of the issue: (select one)
High: Completely blocks me.
2. Environment:
- Ray version: Latest
- Python version: 3.12.9
- OS: Linux
- Cloud/Infrastructure:
- Other libs/tools (if relevant): DeepSpeed
3. What happened vs. what you expected:
- Expected:
- Actual:
Hi,
I try to build my own RL4LLM framework with Ray.
Why not use current frameworks like Openrlhf, veRL, TRL?
- I try to learn something by building my own.
- I need very customized functions which are not supported by current ones.
Why don’t use TorchTrainer?
I need customized function which TorchTrainer is not supported.
Now I am facing the issue of initializing DeepSpeed Zero_Stage 3 Engine with Ray. DeepSpeed Zero 3 will shard the LLM weights on different GPUs. So I need to correctly set up communication between GPUs.
Following is my code to do so:
@ray.remote(num_gpus=1)
class DeepSpeedWorker:
def __init__(self, rank, world_size, master_addr, master_port, local_rank):
self.rank = rank
self.world_size = world_size
self.master_addr = master_addr
self.master_port = master_port
self.local_rank = local_rank
os.environ["LOCAL_RANK"] = '0'
os.environ["RANK"] = str(self.rank)
os.environ["WORLD_SIZE"] = str(self.world_size)
os.environ["MASTER_ADDR"] = self.master_addr
os.environ["MASTER_PORT"] = self.master_port
# Initialize distributed only once
deepspeed.init_distributed(dist_backend="nccl")
# Model setup
model_name = "Qwen/Qwen2.5-0.5B-Instruct"
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.ds_config = {
"train_micro_batch_size_per_gpu": 32,
"optimizer": {"type": "Adam", "params": {"lr": 1e-3}},
"fp16": {"enabled": True},
"zero_optimization": {"stage": 3},
"distributed": {
"enabled": True,
"backend": "nccl"
}
}
print("init deepspeed")
# DeepSpeed initialization
self.model_engine, _, _, _ = deepspeed.initialize(
model=self.model,
model_parameters=self.model.parameters(),
config=self.ds_config
)
print(f"Init Successfully {self.rank}")
num_gpus = 2
bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_gpus)]
pg = placement_group(bundles, strategy="PACK")
ray.get(pg.ready())
world_size = 2 # Should match your actual GPU count
master_addr = "localhost"
master_port = "11451"
actor_group = [DeepSpeedWorker.options(
num_cpus=1,
num_gpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote(i, world_size, master_addr, master_port, i) for i in range(world_size)]
with the launch script
ray start --head \
--port=7788 \
--dashboard-host=0.0.0.0 \
--dashboard-port=8030 \
--num-gpus=2 \
--include-dashboard=true \
--disable-usage-stats
ray job submit --address="http://127.0.0.1:8030" \
--runtime-env-json='{
"env_vars": {
"RAY_DEBUG": "legacy",
"NCCL_DEBUG": "INFO",
"NCCL_SOCKET_IFNAME": "lo"
}
}' \
-- python demo/ray_deepspeed.py
Though the above code works successfully, it seems to create two identical models without weight sharing, against my intention.
I could assign 2 GPUs in one RayActor with the following code
actor_group = DeepSpeedWorker.options(
num_cpus=2,
num_gpus=2,
)
).remote(0, world_size, master_addr, master_port, 0)
But the code gets stuck due to that I only spawn one process.
I wonder, what’s the proper way to manually initialize DeepSpeed Engine with Ray, especially for Zero Stage 3?
Best,
Yihong