Error In loading data in ray.remote function using external cluster

I’m trying to test ray to see what it offers, so if it suits our case, deploy a ray cluster on Kubernetes. currently, I’m using a docker-compose to test it, before deploying kube-ray.

The problem is that I get “ray.exceptions.OwnerDiedError: Failed to retrieve object” sometimes.
and it has some kind of decussate pattern in it, First time it runs pretty fine and the next time I run the same code throws this error and this pattern repeats.

Docker file

FROM scratch as files

COPY --from=res ./README.md ./pyproject.toml /code/
COPY --from=res ./src /code/src



FROM python:3.10-slim as base

RUN --mount=type=cache,target=/var/lib/apt/lists \
    --mount=type=cache,target=/var/cache/apt \
    rm /etc/apt/apt.conf.d/docker-clean \
    && sed -i 's/deb.debian.org/mirror.arvancloud.ir/g' /etc/apt/sources.list.d/debian.sources \
    && apt-get update \
    && apt-get install python3-venv tini curl apt-transport-https python-is-python3 -y \
    && useradd -m -U --shell /bin/bash user



FROM base as venv

RUN python3 -m venv /opt/venv \
    && chown -R user:user /opt/venv


COPY --from=files --link /code /home/user/code

COPY --from=res ./requirements.txt /home/user/code/

ENV VIRTUAL_ENV="/opt/venv" \
    PATH="/opt/venv/bin:$PATH" \
    PIP_CACHE_DIR=/var/cache/pip


RUN --mount=type=cache,target=/var/cache/pip \
    pip install -r /home/user/code/requirements.txt \
    && pip install -e /home/user/code --no-deps


FROM base

COPY --link --from=files --chown=user:user --chmod=755 /code /home/user/code
COPY --link --from=venv --chown=user:user --chmod=755 /opt/venv /opt/venv
COPY --chmod="0555" --chown=root:root ./entrypoint.sh /usr/bin/entrypoint.sh

USER user

ENV VIRTUAL_ENV="/opt/venv" \
    PATH="/opt/venv/bin:$PATH" 

ENTRYPOINT [ "/usr/bin/tini", "--", "entrypoint.sh" ]

Docker compose

name: ray-distributed

x-ray-local: &x-ray-local
  image: ray-local
  build:
    context: ./
    dockerfile: Dockerfile
    additional_contexts:
      res: ../
  restart: unless-stopped
  environment:
    RAY_record_ref_creation_sites: 1
    RAY_BACKEND_LOG_LEVEL: debug
    RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING: 1 
  volumes:
    - /dev/shm:/dev/shm
  develop:
    watch:
      - action: sync+restart
        path: ../src
        target: /home/user/code/src
        ignore:
          - "**/__pycache__"
  networks:
    - ray

x-ray-worker: &x-ray-worker
  <<: *x-ray-local
  command: ray start --address=ray-head:6379 --block
  mem_limit: 2048m
  cpu_count: 2
  # shm_size: 2048m
  depends_on:
    ray-head:
      condition: service_healthy

services:
  # bunch of other containers
  ray-head:
    <<: *x-ray-local
    container_name: ray-head
    ports:
      - 8265:8265
    command: |
      bash -c 'ray start --head \
                --port=6379 \
                --node-ip-address=0.0.0.0 \
                --ray-client-server-port=10001 \
                --dashboard-host=0.0.0.0 \
                --dashboard-port=8265 \
                --block'
    healthcheck:
      test: curl -f http://localhost:8265 || exit 1
      interval: 15s
      retries: 3
      start_period: 5s
      timeout: 5s

  ray-worker-1:
    <<: *x-ray-worker
    container_name: ray-worker-1

  ray-worker-2:
    <<: *x-ray-worker
    container_name: ray-worker-2

  ray-client:
    <<: *x-ray-local
    container_name: ray-client
    tty: true
    stdin_open: true
    entrypoint: sleep infinity
    working_dir: /tmp
    volumes:
      - ../src:/src
      - ./test_client.sh:/tmp/test_client.sh

networks:
  ray:

The code that runs in ray-client container

import ray
import pandas as pd
ray.init("ray://ray-head:10001")

@ray.remote(max_retries=3)
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10, 'b': [5,6] * 10})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

print(ray.get(ray_data_task.remote()))

Full output Error:

(ray_data_task pid=3206) Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
(ray_data_task pid=3206) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(ray_data_task pid=3206) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ray_data_task pid=3206) [dataset]: Run `pip install tqdm` to enable progress reporting.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/src/playground/test.py", line 17, in <module>
    print(ray.get(ray_data_task.remote()))
  File "/opt/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 102, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 434, in get
    res = self._get(to_get, op_timeout)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 462, in _get
    raise err
types.RayTaskError(OwnerDiedError): ray::ray_data_task() (pid=3206, ip=0.0.0.0)
  File "/src/playground/test.py", line 15, in ray_data_task
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4436, in to_pandas
    count = self.count()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 2606, in count
    [get_num_rows.remote(block) for block in self.get_internal_block_refs()]
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4779, in get_internal_block_refs
    blocks = self._plan.execute().get_blocks()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 628, in execute
    blocks = execute_to_legacy_block_list(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 126, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 411, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 320, in get_output_blocking
    raise self._exception
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 212, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 299, in _scheduling_loop_step
    update_operator_states(topology)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 528, in update_operator_states
    op.all_inputs_done()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/operators/base_physical_operator.py", line 93, in all_inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/repartition.py", line 70, in split_repartition_fn
    return scheduler.execute(refs, num_outputs, ctx)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 68, in execute
    split_return = _split_at_indices(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 283, in _split_at_indices
    ] = _split_all_blocks(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 196, in _split_all_blocks
    per_block_split_metadata = ray.get(per_block_split_metadata_futures)
ray.exceptions.RayTaskError(OwnerDiedError): ray::_split_single_block() (pid=483, ip=172.22.0.4)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.OwnerDiedError: Failed to retrieve object 002e96ffd8909c19ffffffffffffffffffffffff0c00000002e1f505. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.

The object's owner has exited. This is the Python worker that first created the ObjectRef via `.remote()` or `ray.put()`. Check cluster logs (`/tmp/ray/session_latest/logs/*51d71de8d044b3224b430ed92f10eed2b627e83bb35cbb2ae95c5a75*` at IP address 0.0.0.0) for more information about the Python worker failure.

Any help would be appreciated so much.