How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I see ray implemented the collective communication so I try to modify the test script to run. But there is an error shown out. Is there something I missed? The script and log show below:
import ray
from ray.util.collective.types import Backend, ReduceOp
import ray.util.collective as collective
import numpy as np
import pytest
@ray.remote(num_cpus=1)
class Worker:
def __init__(self):
self.buffer = None
self.list_buffer = None
def init_tensors(self):
self.buffer = np.ones((10,), dtype=np.float32)
self.list_buffer = [np.ones((10,), dtype=np.float32) for _ in range(2)]
return True
def init_group(self, world_size, rank, backend=Backend.GLOO, group_name="default"):
collective.init_collective_group(world_size, rank, backend, group_name)
return True
def set_buffer(self, data):
self.buffer = data
return self.buffer
def get_buffer(self):
return self.buffer
def set_list_buffer(self, list_of_arrays, copy=False):
if copy:
copy_list = []
for tensor in list_of_arrays:
if isinstance(tensor, np.ndarray):
copy_list.append(tensor.copy())
self.list_buffer = copy_list
else:
self.list_buffer = list_of_arrays
return self.list_buffer
def do_allreduce(self, group_name="default", op=ReduceOp.SUM):
collective.allreduce(self.buffer, group_name, op)
return self.buffer
def do_reduce(self, group_name="default", dst_rank=0, op=ReduceOp.SUM):
collective.reduce(self.buffer, dst_rank, group_name, op)
return self.buffer
def do_broadcast(self, group_name="default", src_rank=0):
collective.broadcast(self.buffer, src_rank, group_name)
return self.buffer
def do_allgather(self, group_name="default"):
collective.allgather(self.list_buffer, self.buffer, group_name)
return self.list_buffer
def do_reducescatter(self, group_name="default", op=ReduceOp.SUM):
collective.reducescatter(self.buffer, self.list_buffer, group_name, op)
return self.buffer
def do_send(self, group_name="default", dst_rank=0):
collective.send(self.buffer, dst_rank, group_name)
return self.buffer
def do_recv(self, group_name="default", src_rank=0):
collective.recv(self.buffer, src_rank, group_name)
return self.buffer
def destroy_group(self, group_name="default"):
collective.destroy_collective_group(group_name)
return True
def report_rank(self, group_name="default"):
rank = collective.get_rank(group_name)
return rank
def report_world_size(self, group_name="default"):
ws = collective.get_collective_group_size(group_name)
return ws
def report_nccl_availability(self):
avail = collective.nccl_available()
return avail
def report_gloo_availability(self):
avail = collective.gloo_available()
return avail
def report_is_group_initialized(self, group_name="default"):
is_init = collective.is_group_initialized(group_name)
return is_init
def create_collective_workers(num_workers=2, group_name="default", backend="gloo"):
actors = [None] * num_workers
for i in range(num_workers):
actor = Worker.remote()
ray.get([actor.init_tensors.remote()])
actors[i] = actor
world_size = num_workers
init_results = ray.get(
[
actor.init_group.remote(world_size, i, backend, group_name)
for i, actor in enumerate(actors)
]
)
return actors, init_results
@pytest.fixture
def ray_start_distributed_2_nodes():
# The cluster has a setup of 2 nodes.
# no GPUs!
ray.init(address='ip:port')
yield
ray.shutdown()
@pytest.mark.parametrize("backend", [Backend.GLOO])
@pytest.mark.parametrize("group_name", [ "test"])
@pytest.mark.parametrize("dst_rank", [ 1, 3, 6])
@pytest.mark.parametrize("src_rank", [0, 2, 4, 7])
@pytest.mark.parametrize("array_size", [2 ** 10])
def test_sendrecv(
ray_start_distributed_2_nodes, group_name, array_size, src_rank, dst_rank, backend
):
if src_rank == dst_rank:
return
world_size = 8
actors, _ = create_collective_workers(
num_workers=world_size, group_name=group_name, backend=backend
)
ray.get(
[
a.set_buffer.remote(np.ones(array_size, dtype=np.float32) * (i + 1))
for i, a in enumerate(actors)
]
)
refs = []
for i in range(world_size):
refs.append(actors[i].get_buffer.remote())
refs[src_rank] = actors[src_rank].do_send.remote(group_name, dst_rank)
refs[dst_rank] = actors[dst_rank].do_recv.remote(group_name, src_rank)
results = ray.get(refs)
assert (
results[src_rank] == np.ones(array_size, dtype=np.float32) * (src_rank + 1)
).all()
assert (
results[dst_rank] == np.ones(array_size, dtype=np.float32) * (src_rank + 1)
).all()
ray.get([a.destroy_group.remote(group_name) for a in actors])
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", "-x", __file__]))
Before I run the script, I start a two-nodes cluster.
====================================================================================================== test session starts ======================================================================================================
platform linux -- Python 3.8.13, pytest-7.1.1, pluggy-1.0.0 -- /home/xzk/anaconda3/envs/ray-build/bin/python
cachedir: .pytest_cache
rootdir: /home/xzk/testfield
collected 12 items
ray_collectivate.py::test_sendrecv[1024-0-1-test-gloo] FAILED [ 8%]
=========================================================================================================== FAILURES ============================================================================================================
_______________________________________________________________________________________________ test_sendrecv[1024-0-1-test-gloo] _______________________________________________________________________________________________
ray_start_distributed_2_nodes = None, group_name = 'test', array_size = 1024, src_rank = 0, dst_rank = 1, backend = 'gloo'
@pytest.mark.parametrize("backend", [Backend.GLOO])
@pytest.mark.parametrize("group_name", [ "test"])
@pytest.mark.parametrize("dst_rank", [ 1, 3, 6])
@pytest.mark.parametrize("src_rank", [0, 2, 4, 7])
@pytest.mark.parametrize("array_size", [2 ** 10])
def test_sendrecv(
ray_start_distributed_2_nodes, group_name, array_size, src_rank, dst_rank, backend
):
if src_rank == dst_rank:
return
world_size = 8
> actors, _ = create_collective_workers(
num_workers=world_size, group_name=group_name, backend=backend
)
ray_collectivate.py:130:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ray_collectivate.py:103: in create_collective_workers
init_results = ray.get(
../RLEnv/ray/python/ray/_private/client_mode_hook.py:105: in wrapper
return func(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
object_refs = [ObjectRef(e014f98d69f9c6dcdc705ec4686bc2fcf0ca20960300000001000000), ObjectRef(a4c3e9e034b7d22678306db3a1f9cccbdef781...a648a7373298fb55e295bc280630300000001000000), ObjectRef(a5cb425597a5160a512b10132707e6e2c64a59eb0300000001000000), ...]
@PublicAPI
@client_mode_hook(auto_init=True)
def get(
object_refs: Union[ray.ObjectRef, List[ray.ObjectRef]],
*,
timeout: Optional[float] = None,
) -> Union[Any, List[Any]]:
"""Get a remote object or a list of remote objects from the object store.
This method blocks until the object corresponding to the object ref is
available in the local object store. If this object is not in the local
object store, it will be shipped from an object store that has it (once the
object has been created). If object_refs is a list, then the objects
corresponding to each object in the list will be returned.
Ordering for an input list of object refs is preserved for each object
returned. That is, if an object ref to A precedes an object ref to B in the
input list, then A will precede B in the returned list.
This method will issue a warning if it's running inside async context,
you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For
a list of object refs, you can use ``await asyncio.gather(*object_refs)``.
Args:
object_refs: Object ref of the object to get or a list of object refs
to get.
timeout (Optional[float]): The maximum amount of time in seconds to
wait before returning.
Returns:
A Python object or a list of Python objects.
Raises:
GetTimeoutError: A GetTimeoutError is raised if a timeout is set and
the get takes longer than timeout to return.
Exception: An exception is raised if the task that created the object
or that created one of the objects raised an exception.
"""
worker = global_worker
worker.check_connected()
if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio():
global blocking_get_inside_async_warned
if not blocking_get_inside_async_warned:
logger.warning(
"Using blocking ray.get inside async actor. "
"This blocks the event loop. Please use `await` "
"on object ref with asyncio.gather if you want to "
"yield execution to the event loop instead."
)
blocking_get_inside_async_warned = True
with profiling.profile("ray.get"):
is_individual_id = isinstance(object_refs, ray.ObjectRef)
if is_individual_id:
object_refs = [object_refs]
if not isinstance(object_refs, list):
raise ValueError(
"'object_refs' must either be an object ref "
"or a list of object refs."
)
# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
for i, value in enumerate(values):
if isinstance(value, RayError):
if isinstance(value, ray.exceptions.ObjectLostError):
worker.core_worker.dump_object_store_memory_usage()
if isinstance(value, RayTaskError):
> raise value.as_instanceof_cause()
E ray.exceptions.RayTaskError(ValueError): ray::Worker.init_group() (pid=25363, ip=223.193.8.206, repr=<ray_collectivate.Worker object at 0x7fc4f256b430>)
E File "/home/xzk/testfield/ray_collectivate.py", line 22, in init_group
E collective.init_collective_group(world_size, rank, backend, group_name)
E File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective.py", line 143, in init_collective_group
E _group_mgr.create_collective_group(backend, world_size, rank, group_name)
E File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective.py", line 62, in create_collective_group
E g = GLOOGroup(
E File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective_group/gloo_collective_group.py", line 194, in __init__
E self._rendezvous = Rendezvous(
E File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective_group/gloo_collective_group.py", line 46, in __init__
E (
E ValueError: not enough values to unpack (expected 2, got 1)
../RLEnv/ray/python/ray/worker.py:1809: RayTaskError(ValueError)
==================================================================================================== short test summary info ====================================================================================================
FAILED ray_collectivate.py::test_sendrecv[1024-0-1-test-gloo] - ray.exceptions.RayTaskError(ValueError): ray::Worker.init_group() (pid=25363, ip=223.193.8.206, repr=<ray_collectivate.Worker object at 0x7fc4f256b430>)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================================== 1 failed in 19.02s =======================================================================================================