Get blocks forever for worker node actor (if returning more than 100Kb)

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.
  • High: It blocks me to complete my task.

Hi!

I have a head node with 2 GPUs on one physical server (in VM), and another worker node with 1 GPU on another (in VM clone). These are connected over the network bridge. No problem so far with connecting and getting such Ray mini-cluster up and running manually. 2 physical hosts, 2 identical VMs (except network settings), 3 identical GPUs. Ray 2.6.1, python 3.10.12 (Ubuntu 22.04), grpcio 1.56.2 (pops up in backtrace, see below).

Now the thing is that for Ray Actor, which is running on the worker, get() blocks indefinitely. Actor running on the worker does its job (it logs everything) as well, it puts the result in the object store (can be seen by “ray status” as 20 Mb object - which roughly corresponds to the size of numpy array being returned). However, the get stucks forever. No problems with Actors running on the head node with the same data.

If I change the return value of actor to something else (e.g., None, some smaller array, or ray.put(array)), then it works. I have found by trials that 102155 bytes is the largest numpy array that can be returned without problems from worker node. I guess that numpy adds a small header (+128 bytes, I guess), and there might be some pickling overhead too.

Don’t know where to start looking to solve this problem. Why get() stucks with slightly more than 100 Kbytes?

UPD: It might be related to the following Ray optimization. So only inline results are being returned correctly from worker node, and the larger results somehow stuck in distributed object store.

Anti-pattern: Returning ray.put() ObjectRefs from a task harms performance and fault tolerance — Ray 2.8.0
Returning ray.put() ObjectRefs are considered anti-patterns for the following reasons:

It disallows inlining small return values: Ray has a performance optimization to return small (<= 100KB) values inline directly to the caller, avoiding going through the distributed object store. On the other hand, ray.put() will unconditionally store the value to the object store which makes the optimization for small return values impossible.

####################################################################
“ray status” when get() is stucked.

Node status
---------------------------------------------------------------
Healthy:
 1 node_95172e2f81ad61f51af14d4b6fb4668c3ce1c5959b6eb186449376b7
 1 node_97ea9fa1c91a41417c863898f62a6f204f4c35e0a3b994359e30e3c6
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 3.0/32.0 CPU
 3.0/3.0 GPU
 0B/29.18GiB memory
 19.14MiB/13.82GiB object_store_memory

Demands:
 (no resource demands)


####################################################################

Stacktrace if get() is interrupted:

File "/usr/local/lib/python3.10/dist-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/client_mode_hook.py", line 102, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/worker.py", line 434, in get
    res = self._get(to_get, op_timeout)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/worker.py", line 455, in _get
    for chunk in resp:
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/worker.py", line 324, in _get_object_iterator
    for chunk in self.server.GetObject(req, *args, **kwargs):
  File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 475, in __next__
    return self._next()
  File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 872, in _next
    _common.wait(self._state.condition.wait, _response_ready)
  File "/usr/local/lib/python3.10/dist-packages/grpc/_common.py", line 150, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/usr/local/lib/python3.10/dist-packages/grpc/_common.py", line 112, in _wait_once
    wait_fn(timeout=timeout)
  File "/usr/lib/python3.10/threading.py", line 324, in wait
    gotit = waiter.acquire(True, timeout)

import numpy as np
import ray

@ray.remote(num_gpus=1)
class GPUActor:
    def getsize(self, size):
        retdata = np.zeros((size), dtype = np.uint8)
        print("Return data size", retdata.nbytes)
        return retdata

if __name__ == '__main__':
    rayinit = ray.init(address = "ray://192.168.255.5:10001")
    actors = []
    gpus_available = int(ray.available_resources().get("GPU", 0))
    
    for x in range(gpus_available):
        actors.append(GPUActor.remote())

    # third actor is on the worker node
    # two others on head node are Ok
    actor = actors[2]
    for k in range(96,128):
        result = ray.get(actor.getsize.remote(k*1024)) # <<<<----- stuck here if k=100
        print(k, result.nbytes)

If returning numpy array indirectly as an object reference (using put()), then it works:

import numpy as np
import ray

@ray.remote(num_gpus=1)
class GPUActor:
    def getsize(self, size):
        retdata = np.zeros((size), dtype = np.uint8)
        print("Return data size", retdata.nbytes)
        return ray.put(retdata)
        #return retdata

if __name__ == '__main__':
    rayinit = ray.init(address = "ray://192.168.255.5:10001")
    actors = []
    gpus_available = int(ray.available_resources().get("GPU", 0))
    
    for x in range(gpus_available):
        actors.append(GPUActor.remote())

    actor = actors[2]
    for k in range(96,128):
        result = ray.get(actor.getsize.remote(k*1024)) 
        result = ray.get(result)
        print(k, result.nbytes)

if using explicit objref = put(value) in Actor and value = get(objref), then quite often I get such an exception. Seems that there is really a bug in Ray.

Local object store memory usage:

(global lru) capacity: 9918860083
(global lru) used: 0.202183%
(global lru) num objects: 1
(global lru) num evictions: 0
(global lru) bytes evicted: 0

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/client_mode_hook.py", line 102, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/worker.py", line 434, in get
    res = self._get(to_get, op_timeout)
  File "/usr/local/lib/python3.10/dist-packages/ray/util/client/worker.py", line 462, in _get
    raise err
ray.exceptions.ReferenceCountingAssertionError: Failed to retrieve object 008fdf60a96cd022c3e272d405d7a25c919b23235800000002e1f505. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.

The object has already been deleted by the reference counting protocol. This should not happen.

The problem seems to be in network topology. Despite explicit settings of node IP addresses (–node-ip-address 192.168.255.5, e.g.), Ray seems to use other existing network interfaces too. With a single network interface there are no more such errors/exceptions. However, this is not secure - there is only one network interface and ray ports are open on it.

1 Like