Ray collective communication test error

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I see ray implemented the collective communication so I try to modify the test script to run. But there is an error shown out. Is there something I missed? The script and log show below:

import ray
from ray.util.collective.types import Backend, ReduceOp
import ray.util.collective as collective
import numpy as np
import pytest




@ray.remote(num_cpus=1)
class Worker:
    def __init__(self):
        self.buffer = None
        self.list_buffer = None

    def init_tensors(self):
        self.buffer = np.ones((10,), dtype=np.float32)
        self.list_buffer = [np.ones((10,), dtype=np.float32) for _ in range(2)]
        return True

    def init_group(self, world_size, rank, backend=Backend.GLOO, group_name="default"):
        collective.init_collective_group(world_size, rank, backend, group_name)
        return True

    def set_buffer(self, data):
        self.buffer = data
        return self.buffer

    def get_buffer(self):
        return self.buffer

    def set_list_buffer(self, list_of_arrays, copy=False):
        if copy:
            copy_list = []
            for tensor in list_of_arrays:
                if isinstance(tensor, np.ndarray):
                    copy_list.append(tensor.copy())
            self.list_buffer = copy_list
        else:
            self.list_buffer = list_of_arrays
        return self.list_buffer

    def do_allreduce(self, group_name="default", op=ReduceOp.SUM):
        collective.allreduce(self.buffer, group_name, op)
        return self.buffer

    def do_reduce(self, group_name="default", dst_rank=0, op=ReduceOp.SUM):
        collective.reduce(self.buffer, dst_rank, group_name, op)
        return self.buffer

    def do_broadcast(self, group_name="default", src_rank=0):
        collective.broadcast(self.buffer, src_rank, group_name)
        return self.buffer

    def do_allgather(self, group_name="default"):
        collective.allgather(self.list_buffer, self.buffer, group_name)
        return self.list_buffer

    def do_reducescatter(self, group_name="default", op=ReduceOp.SUM):
        collective.reducescatter(self.buffer, self.list_buffer, group_name, op)
        return self.buffer

    def do_send(self, group_name="default", dst_rank=0):
        collective.send(self.buffer, dst_rank, group_name)
        return self.buffer

    def do_recv(self, group_name="default", src_rank=0):
        collective.recv(self.buffer, src_rank, group_name)
        return self.buffer

    def destroy_group(self, group_name="default"):
        collective.destroy_collective_group(group_name)
        return True

    def report_rank(self, group_name="default"):
        rank = collective.get_rank(group_name)
        return rank

    def report_world_size(self, group_name="default"):
        ws = collective.get_collective_group_size(group_name)
        return ws

    def report_nccl_availability(self):
        avail = collective.nccl_available()
        return avail

    def report_gloo_availability(self):
        avail = collective.gloo_available()
        return avail

    def report_is_group_initialized(self, group_name="default"):
        is_init = collective.is_group_initialized(group_name)
        return is_init


def create_collective_workers(num_workers=2, group_name="default", backend="gloo"):
    actors = [None] * num_workers
    for i in range(num_workers):
        actor = Worker.remote()
        ray.get([actor.init_tensors.remote()])
        actors[i] = actor
    world_size = num_workers
    init_results = ray.get(
        [
            actor.init_group.remote(world_size, i, backend, group_name)
            for i, actor in enumerate(actors)
        ]
    )
    return actors, init_results

@pytest.fixture
def ray_start_distributed_2_nodes():
    # The cluster has a setup of 2 nodes.
    # no GPUs!
    ray.init(address='ip:port')
    yield
    ray.shutdown()

@pytest.mark.parametrize("backend", [Backend.GLOO])
@pytest.mark.parametrize("group_name", [ "test"])
@pytest.mark.parametrize("dst_rank", [ 1, 3, 6])
@pytest.mark.parametrize("src_rank", [0, 2, 4, 7])
@pytest.mark.parametrize("array_size", [2 ** 10])
def test_sendrecv(
    ray_start_distributed_2_nodes, group_name, array_size, src_rank, dst_rank, backend
):
    if src_rank == dst_rank:
        return
    world_size = 8
    actors, _ = create_collective_workers(
        num_workers=world_size, group_name=group_name, backend=backend
    )
    ray.get(
        [
            a.set_buffer.remote(np.ones(array_size, dtype=np.float32) * (i + 1))
            for i, a in enumerate(actors)
        ]
    )
    refs = []
    for i in range(world_size):
        refs.append(actors[i].get_buffer.remote())
    refs[src_rank] = actors[src_rank].do_send.remote(group_name, dst_rank)
    refs[dst_rank] = actors[dst_rank].do_recv.remote(group_name, src_rank)
    results = ray.get(refs)
    assert (
        results[src_rank] == np.ones(array_size, dtype=np.float32) * (src_rank + 1)
    ).all()
    assert (
        results[dst_rank] == np.ones(array_size, dtype=np.float32) * (src_rank + 1)
    ).all()
    ray.get([a.destroy_group.remote(group_name) for a in actors])


if __name__ == "__main__":
    import pytest
    import sys

    sys.exit(pytest.main(["-v", "-x", __file__]))

Before I run the script, I start a two-nodes cluster.

====================================================================================================== test session starts ======================================================================================================
platform linux -- Python 3.8.13, pytest-7.1.1, pluggy-1.0.0 -- /home/xzk/anaconda3/envs/ray-build/bin/python
cachedir: .pytest_cache
rootdir: /home/xzk/testfield
collected 12 items                                                                                                                                                                                                              

ray_collectivate.py::test_sendrecv[1024-0-1-test-gloo] FAILED                                                                                                                                                             [  8%]

=========================================================================================================== FAILURES ============================================================================================================
_______________________________________________________________________________________________ test_sendrecv[1024-0-1-test-gloo] _______________________________________________________________________________________________

ray_start_distributed_2_nodes = None, group_name = 'test', array_size = 1024, src_rank = 0, dst_rank = 1, backend = 'gloo'

    @pytest.mark.parametrize("backend", [Backend.GLOO])
    @pytest.mark.parametrize("group_name", [ "test"])
    @pytest.mark.parametrize("dst_rank", [ 1, 3, 6])
    @pytest.mark.parametrize("src_rank", [0, 2, 4, 7])
    @pytest.mark.parametrize("array_size", [2 ** 10])
    def test_sendrecv(
        ray_start_distributed_2_nodes, group_name, array_size, src_rank, dst_rank, backend
    ):
        if src_rank == dst_rank:
            return
        world_size = 8
>       actors, _ = create_collective_workers(
            num_workers=world_size, group_name=group_name, backend=backend
        )

ray_collectivate.py:130: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ray_collectivate.py:103: in create_collective_workers
    init_results = ray.get(
../RLEnv/ray/python/ray/_private/client_mode_hook.py:105: in wrapper
    return func(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

object_refs = [ObjectRef(e014f98d69f9c6dcdc705ec4686bc2fcf0ca20960300000001000000), ObjectRef(a4c3e9e034b7d22678306db3a1f9cccbdef781...a648a7373298fb55e295bc280630300000001000000), ObjectRef(a5cb425597a5160a512b10132707e6e2c64a59eb0300000001000000), ...]

    @PublicAPI
    @client_mode_hook(auto_init=True)
    def get(
        object_refs: Union[ray.ObjectRef, List[ray.ObjectRef]],
        *,
        timeout: Optional[float] = None,
    ) -> Union[Any, List[Any]]:
        """Get a remote object or a list of remote objects from the object store.
    
        This method blocks until the object corresponding to the object ref is
        available in the local object store. If this object is not in the local
        object store, it will be shipped from an object store that has it (once the
        object has been created). If object_refs is a list, then the objects
        corresponding to each object in the list will be returned.
    
        Ordering for an input list of object refs is preserved for each object
        returned. That is, if an object ref to A precedes an object ref to B in the
        input list, then A will precede B in the returned list.
    
        This method will issue a warning if it's running inside async context,
        you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For
        a list of object refs, you can use ``await asyncio.gather(*object_refs)``.
    
        Args:
            object_refs: Object ref of the object to get or a list of object refs
                to get.
            timeout (Optional[float]): The maximum amount of time in seconds to
                wait before returning.
    
        Returns:
            A Python object or a list of Python objects.
    
        Raises:
            GetTimeoutError: A GetTimeoutError is raised if a timeout is set and
                the get takes longer than timeout to return.
            Exception: An exception is raised if the task that created the object
                or that created one of the objects raised an exception.
        """
        worker = global_worker
        worker.check_connected()
    
        if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio():
            global blocking_get_inside_async_warned
            if not blocking_get_inside_async_warned:
                logger.warning(
                    "Using blocking ray.get inside async actor. "
                    "This blocks the event loop. Please use `await` "
                    "on object ref with asyncio.gather if you want to "
                    "yield execution to the event loop instead."
                )
                blocking_get_inside_async_warned = True
    
        with profiling.profile("ray.get"):
            is_individual_id = isinstance(object_refs, ray.ObjectRef)
            if is_individual_id:
                object_refs = [object_refs]
    
            if not isinstance(object_refs, list):
                raise ValueError(
                    "'object_refs' must either be an object ref "
                    "or a list of object refs."
                )
    
            # TODO(ujvl): Consider how to allow user to retrieve the ready objects.
            values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
            for i, value in enumerate(values):
                if isinstance(value, RayError):
                    if isinstance(value, ray.exceptions.ObjectLostError):
                        worker.core_worker.dump_object_store_memory_usage()
                    if isinstance(value, RayTaskError):
>                       raise value.as_instanceof_cause()
E                       ray.exceptions.RayTaskError(ValueError): ray::Worker.init_group() (pid=25363, ip=223.193.8.206, repr=<ray_collectivate.Worker object at 0x7fc4f256b430>)
E                         File "/home/xzk/testfield/ray_collectivate.py", line 22, in init_group
E                           collective.init_collective_group(world_size, rank, backend, group_name)
E                         File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective.py", line 143, in init_collective_group
E                           _group_mgr.create_collective_group(backend, world_size, rank, group_name)
E                         File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective.py", line 62, in create_collective_group
E                           g = GLOOGroup(
E                         File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective_group/gloo_collective_group.py", line 194, in __init__
E                           self._rendezvous = Rendezvous(
E                         File "/home/xzk/RLEnv/ray/python/ray/util/collective/collective_group/gloo_collective_group.py", line 46, in __init__
E                           (
E                       ValueError: not enough values to unpack (expected 2, got 1)

../RLEnv/ray/python/ray/worker.py:1809: RayTaskError(ValueError)
==================================================================================================== short test summary info ====================================================================================================
FAILED ray_collectivate.py::test_sendrecv[1024-0-1-test-gloo] - ray.exceptions.RayTaskError(ValueError): ray::Worker.init_group() (pid=25363, ip=223.193.8.206, repr=<ray_collectivate.Worker object at 0x7fc4f256b430>)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================================== 1 failed in 19.02s =======================================================================================================

Hi @xyzyx, we are aware that as of Ray 1.12.1, some collective communication examples are out of date. There are efforts underway to improve the docs.

1 Like