Ray crash when use complex function

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity
  • Low: It annoys or frustrates me for a moment.
  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.
  • High: It blocks me to complete my task.

ubuntu server 22.04
ray 2.36.0

I have 2 node (1 head 1 worker) , each have 48 CPU Core , So I get:
Usage:

0.0/96.0 CPU

0.0/1.0 GPU

0B/157.14GiB memory

0B/71.34GiB object_store_memory

Demands:

(no resource demands)

when I use 48 task , it run normal, when more than 48. I get crash:

ray.exceptions.RayTaskError(OwnerDiedError): ray::task_calc_delay_dopf_corr_v() (pid=164479, ip=192.168.68.202)
At least one of the input arguments for this task could not be computed:
ray.exceptions.OwnerDiedError: Failed to retrieve object 00ffffffffffffffffffffffffffffffffffffff1700000001e1f505. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during ray start and ray.init().

The object’s owner has exited. This is the Python worker that first created the ObjectRef via .remote() or ray.put(). Check cluster logs (/tmp/ray/session_latest/logs/*17000000ffffffffffffffffffffffffffffffffffffffffffffffff* at IP address 0.0.0.0) for more information about the Python worker failure.

please, help!

Can you share a reproduction script?

sorray for take so long.

I write a similary code, but this code can not use cluster resource now, it can only run on local machine, Check the dashboard, it’s not running on the cluster.


import numpy as np
import ray
from tqdm import tqdm 

ray.init(address='ray://192.168.68.224:10001', runtime_env={"working_dir": "."})


def test_func(values, poly, sr):
    values_len = len(values)
    idxs = np.arange(0, values_len)
    t = (idxs+0)/sr
    ddd = np.polyval(poly, t)
    
    ddd_i = np.round(ddd*sr).astype('int')
    add_len = int(np.ceil(ddd[-1]*sr))
    new_values_len = add_len+values_len
    new_values = np.zeros(new_values_len, dtype=np.complex64)
    left_padlen = np.min(ddd_i)
    nidxs = idxs+ddd_i
    new_values[nidxs] = values[idxs]
    anidxs = np.ones(new_values_len)*-1
    anidxs[nidxs] = idxs
    inter_posi = np.argwhere(anidxs == -1)
    inter_posi = inter_posi[inter_posi > left_padlen]
    for posi in inter_posi:
        li = posi-1
        ri = posi+1
        if ri > new_values_len-1:
            continue
        new_values[posi] = (new_values[li]+new_values[ri])/2
    return new_values

@ray.remote(num_cpus=1)
def wrap_test_func(values, poly_ays, i, sr):
    poly_ay = poly_ays[i]
    return test_func(values, poly_ay, sr[0])

def ray_try_it():
    # task
    sr = np.array([5000000])
    value = np.random.randn(100000)
    poly_ays = np.array([[-7.588960646222505e-06, 0.001910581848744421]])
    poly_ays = np.tile(poly_ays, [200, 1])
    value_ray = ray.put(value)
    poly_ays_ray = ray.put(poly_ays)
    sr_ray = ray.put(sr)
    
    pbar = tqdm(range(1000))
    pbar.set_description(f'a: ')
    result_cnt = []
    for _ in range(1000):
        pbar.update(1)
        objs = []
        for i in range(120):
            res = wrap_test_func.remote(value_ray, poly_ays_ray, i, sr_ray)
            objs.append(res)
        
        still_running = list(objs)
        while True:
            finished, still_running = ray.wait(still_running)
            if len(still_running) > 0:
                continue
            else:
                break 
        results = ray.get(objs)
        result_cnt.append(len(results))
    print(result_cnt)
data_ns = ray_try_it()

clarifying - so the repro script fails when you initiate a ray client session on local cluster resource but when you try to do the same on a remote cluster it fails? if so, with what error (the original that you posted)?