I’m trying to test ray to see what it offers, so if it suits our case, deploy a ray cluster on Kubernetes. currently, I’m using a docker-compose to test it, before deploying kube-ray.
The problem is that I get “ray.exceptions.OwnerDiedError: Failed to retrieve object” sometimes.
and it has some kind of decussate pattern in it, First time it runs pretty fine and the next time I run the same code throws this error and this pattern repeats.
Docker file
FROM scratch as files
COPY --from=res ./README.md ./pyproject.toml /code/
COPY --from=res ./src /code/src
FROM python:3.10-slim as base
RUN --mount=type=cache,target=/var/lib/apt/lists \
--mount=type=cache,target=/var/cache/apt \
rm /etc/apt/apt.conf.d/docker-clean \
&& sed -i 's/deb.debian.org/mirror.arvancloud.ir/g' /etc/apt/sources.list.d/debian.sources \
&& apt-get update \
&& apt-get install python3-venv tini curl apt-transport-https python-is-python3 -y \
&& useradd -m -U --shell /bin/bash user
FROM base as venv
RUN python3 -m venv /opt/venv \
&& chown -R user:user /opt/venv
COPY --from=files --link /code /home/user/code
COPY --from=res ./requirements.txt /home/user/code/
ENV VIRTUAL_ENV="/opt/venv" \
PATH="/opt/venv/bin:$PATH" \
PIP_CACHE_DIR=/var/cache/pip
RUN --mount=type=cache,target=/var/cache/pip \
pip install -r /home/user/code/requirements.txt \
&& pip install -e /home/user/code --no-deps
FROM base
COPY --link --from=files --chown=user:user --chmod=755 /code /home/user/code
COPY --link --from=venv --chown=user:user --chmod=755 /opt/venv /opt/venv
COPY --chmod="0555" --chown=root:root ./entrypoint.sh /usr/bin/entrypoint.sh
USER user
ENV VIRTUAL_ENV="/opt/venv" \
PATH="/opt/venv/bin:$PATH"
ENTRYPOINT [ "/usr/bin/tini", "--", "entrypoint.sh" ]
Docker compose
name: ray-distributed
x-ray-local: &x-ray-local
image: ray-local
build:
context: ./
dockerfile: Dockerfile
additional_contexts:
res: ../
restart: unless-stopped
environment:
RAY_record_ref_creation_sites: 1
RAY_BACKEND_LOG_LEVEL: debug
RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING: 1
volumes:
- /dev/shm:/dev/shm
develop:
watch:
- action: sync+restart
path: ../src
target: /home/user/code/src
ignore:
- "**/__pycache__"
networks:
- ray
x-ray-worker: &x-ray-worker
<<: *x-ray-local
command: ray start --address=ray-head:6379 --block
mem_limit: 2048m
cpu_count: 2
# shm_size: 2048m
depends_on:
ray-head:
condition: service_healthy
services:
# bunch of other containers
ray-head:
<<: *x-ray-local
container_name: ray-head
ports:
- 8265:8265
command: |
bash -c 'ray start --head \
--port=6379 \
--node-ip-address=0.0.0.0 \
--ray-client-server-port=10001 \
--dashboard-host=0.0.0.0 \
--dashboard-port=8265 \
--block'
healthcheck:
test: curl -f http://localhost:8265 || exit 1
interval: 15s
retries: 3
start_period: 5s
timeout: 5s
ray-worker-1:
<<: *x-ray-worker
container_name: ray-worker-1
ray-worker-2:
<<: *x-ray-worker
container_name: ray-worker-2
ray-client:
<<: *x-ray-local
container_name: ray-client
tty: true
stdin_open: true
entrypoint: sleep infinity
working_dir: /tmp
volumes:
- ../src:/src
- ./test_client.sh:/tmp/test_client.sh
networks:
ray:
The code that runs in ray-client container
import ray
import pandas as pd
ray.init("ray://ray-head:10001")
@ray.remote(max_retries=3)
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10, 'b': [5,6] * 10})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
print(ray.get(ray_data_task.remote()))
Full output Error:
(ray_data_task pid=3206) Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
(ray_data_task pid=3206) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(ray_data_task pid=3206) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ray_data_task pid=3206) [dataset]: Run `pip install tqdm` to enable progress reporting.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/src/playground/test.py", line 17, in <module>
print(ray.get(ray_data_task.remote()))
File "/opt/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
return fn(*args, **kwargs)
File "/opt/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 102, in wrapper
return getattr(ray, func.__name__)(*args, **kwargs)
File "/opt/venv/lib/python3.10/site-packages/ray/util/client/api.py", line 42, in get
return self.worker.get(vals, timeout=timeout)
File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 434, in get
res = self._get(to_get, op_timeout)
File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 462, in _get
raise err
types.RayTaskError(OwnerDiedError): ray::ray_data_task() (pid=3206, ip=0.0.0.0)
File "/src/playground/test.py", line 15, in ray_data_task
File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4436, in to_pandas
count = self.count()
File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 2606, in count
[get_num_rows.remote(block) for block in self.get_internal_block_refs()]
File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4779, in get_internal_block_refs
blocks = self._plan.execute().get_blocks()
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 628, in execute
blocks = execute_to_legacy_block_list(
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 126, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 411, in _bundles_to_block_list
for ref_bundle in bundles:
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
item = self._outer._output_node.get_output_blocking(
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 320, in get_output_blocking
raise self._exception
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 212, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 299, in _scheduling_loop_step
update_operator_states(topology)
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 528, in update_operator_states
op.all_inputs_done()
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/operators/base_physical_operator.py", line 93, in all_inputs_done
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/repartition.py", line 70, in split_repartition_fn
return scheduler.execute(refs, num_outputs, ctx)
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 68, in execute
split_return = _split_at_indices(
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 283, in _split_at_indices
] = _split_all_blocks(
File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 196, in _split_all_blocks
per_block_split_metadata = ray.get(per_block_split_metadata_futures)
ray.exceptions.RayTaskError(OwnerDiedError): ray::_split_single_block() (pid=483, ip=172.22.0.4)
At least one of the input arguments for this task could not be computed:
ray.exceptions.OwnerDiedError: Failed to retrieve object 002e96ffd8909c19ffffffffffffffffffffffff0c00000002e1f505. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.
The object's owner has exited. This is the Python worker that first created the ObjectRef via `.remote()` or `ray.put()`. Check cluster logs (`/tmp/ray/session_latest/logs/*51d71de8d044b3224b430ed92f10eed2b627e83bb35cbb2ae95c5a75*` at IP address 0.0.0.0) for more information about the Python worker failure.
Any help would be appreciated so much.