‘Worker’ object has no attribute ‘core_worker’

What happened + What you expected to happen

I am attempting to use the ray.data.read_parquet method to read data from an Azure Storage account. However, I encounter the following error when trying to call to_pandas on the Ray dataset: I have created my own ray cluster using docker-compose for local development and testing.


2024-11-11 16:58:13,900 ERROR exceptions.py:81 – Full stack trace:
Traceback (most recent call last):
File “/Users/.venv/lib/python3.11/site-packages/ray/data/exceptions.py”, line 49, in handle_trace
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/plan.py”, line 423, in execute_to_iterator
bundle_iter = execute_to_legacy_bundle_iterator(executor, self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py”, line 51, in execute_to_legacy_bundle_iterator
bundle_iter = executor.execute(dag, initial_stats=stats)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py”, line 124, in execute
self._topology, _ = build_streaming_topology(dag, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 382, in build_streaming_topology
setup_state(dag)
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 373, in setup_state
parent_state = setup_state(parent)
^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 379, in setup_state
op.start(options)
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/input_data_buffer.py”, line 48, in start
self._input_data = self._input_data_factory(
^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py”, line 85, in get_input_data
cleaned_metadata(read_task, read_task_ref),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py”, line 35, in cleaned_metadata
locations = get_local_object_locations([read_task_ref])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/experimental/locations.py”, line 75, in get_local_object_locations
core_worker = ray._private.worker.global_worker.core_worker

AttributeError: ‘Worker’ object has no attribute ‘core_worker’
Running 0: 0.00 row [00:00, ? row/s]
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File “/Users/fme_tms_ldm_ray_cluster/sample_code/src/sample_code/deltalake_test.py”, line 39, in
df = ds.to_pandas()
^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/dataset.py”, line 4588, in to_pandas
bundles = self.iter_internal_ref_bundles()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/dataset.py”, line 4845, in iter_internal_ref_bundles
iter_ref_bundles, _, _ = self._plan.execute_to_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/exceptions.py”, line 89, in handle_trace
raise e.with_traceback(None) from SystemException()
AttributeError: ‘Worker’ object has no attribute ‘core_worker’

Versions / Dependencies

ray = 2.38.0
python = 3.11.10
Os = macos

Reproduction script

raytest image

FROM python:3.11.10-slim

RUN pip install --no-cache-dir -U 'ray[default,data]' adlfs fsspec tqdm deltalake deltaray

envs

HOST=localhost
RAY_IMAGE=raytest
REDISPORT=6345
DASHBOARDPORT=8265
HEADNODEPORT=20001
REDISPASSWORD=yourpassword
NUM_WORKERS=4
NUM_CPU_WORKER=1

docker compose

version: "3"

services:
  ray-head:
    image: ${RAY_IMAGE}
    ports:
      - "${REDISPORT}:${REDISPORT}"
      - "${DASHBOARDPORT}:${DASHBOARDPORT}"
      - "${HEADNODEPORT}:${HEADNODEPORT}"
    env_file:
      - .env
    command: bash -c "ray start --head --ray-client-server-port=${HEADNODEPORT} --dashboard-port=${DASHBOARDPORT} --port=${REDISPORT} --dashboard-host=0.0.0.0 --redis-password=${REDISPASSWORD} --block"
    shm_size: 2g
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: '2g'
    networks:
      - ray_net

  ray-worker:
    image: ${RAY_IMAGE}
    depends_on: 
      - ray-head
    env_file:
      - .env
    command: bash -c "ray start --address=ray-head:${REDISPORT} --redis-password=${REDISPASSWORD} --num-cpus=${NUM_CPU_WORKER} --block" 
    shm_size: 2g
    deploy:
      mode: replicated
      replicas: ${NUM_WORKERS} 
      resources:
        limits:
          cpus: ${NUM_CPU_WORKER}
          memory: '2g'
    networks:
      - ray_net
  
networks:
  ray_net:
    ipam:
      driver: default
      config:
        - subnet: 172.63.0.0/16

python script

import ray
from deltalake import DeltaTable
from ray.data import read_parquet
from ray.data.datasource import ParquetMetadataProvider
from fsspec import filesystem

ray.init(address='ray://localhost:20001')

access_key = "mykey"
account_name = "myaccountname"

storage_options = {
    "account_name": account_name,
    "account_key": access_key,
}
fs = filesystem("abfs", **storage_options)

partition_filters = [("customer_id", "in", ["abc"])]

dt = DeltaTable("abfs://mypath", storage_options=storage_options)

ds = read_parquet(
    paths=dt.file_uris(partition_filters=partition_filters),
    filesystem=fs,
    columns=None,
    parallelism=-1,
    ray_remote_args=None,
    tensor_column_schema=None,
    meta_provider=ParquetMetadataProvider(),
)

df = ds.to_pandas()
print(df)

Issue Severity

High: It blocks me from completing my task.

I think this issue stems from how you are initializing ray with ray.init(address='ray://localhost:20001'). This will use ray in Client Mode which has some limitations for various libraries (more on ray Client Mode here). In this case the driver doesn’t have access to the core_worker attribute because it’s not a full Ray worker, and the error AttributeError: 'Worker' object has no attribute 'core_worker' occurs because the function get_local_object_locations tries to access ray._private.worker.global_worker.core_worker, which doesn’t exist in client mode.

I would try this again without using Client Mode and see if that fixes your problem. More on how to call ray init here.