What happened + What you expected to happen
I am attempting to use the ray.data.read_parquet method to read data from an Azure Storage account. However, I encounter the following error when trying to call to_pandas on the Ray dataset: I have created my own ray cluster using docker-compose for local development and testing.
2024-11-11 16:58:13,900 ERROR exceptions.py:81 – Full stack trace:
Traceback (most recent call last):
File “/Users/.venv/lib/python3.11/site-packages/ray/data/exceptions.py”, line 49, in handle_trace
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/plan.py”, line 423, in execute_to_iterator
bundle_iter = execute_to_legacy_bundle_iterator(executor, self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py”, line 51, in execute_to_legacy_bundle_iterator
bundle_iter = executor.execute(dag, initial_stats=stats)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py”, line 124, in execute
self._topology, _ = build_streaming_topology(dag, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 382, in build_streaming_topology
setup_state(dag)
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 373, in setup_state
parent_state = setup_state(parent)
^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py”, line 379, in setup_state
op.start(options)
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/input_data_buffer.py”, line 48, in start
self._input_data = self._input_data_factory(
^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py”, line 85, in get_input_data
cleaned_metadata(read_task, read_task_ref),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py”, line 35, in cleaned_metadata
locations = get_local_object_locations([read_task_ref])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/experimental/locations.py”, line 75, in get_local_object_locations
core_worker = ray._private.worker.global_worker.core_worker
AttributeError: ‘Worker’ object has no attribute ‘core_worker’
Running 0: 0.00 row [00:00, ? row/s]
ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File “/Users/fme_tms_ldm_ray_cluster/sample_code/src/sample_code/deltalake_test.py”, line 39, in
df = ds.to_pandas()
^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/dataset.py”, line 4588, in to_pandas
bundles = self.iter_internal_ref_bundles()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/dataset.py”, line 4845, in iter_internal_ref_bundles
iter_ref_bundles, _, _ = self._plan.execute_to_iterator()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/.venv/lib/python3.11/site-packages/ray/data/exceptions.py”, line 89, in handle_trace
raise e.with_traceback(None) from SystemException()
AttributeError: ‘Worker’ object has no attribute ‘core_worker’
Versions / Dependencies
ray = 2.38.0
python = 3.11.10
Os = macos
Reproduction script
raytest image
FROM python:3.11.10-slim
RUN pip install --no-cache-dir -U 'ray[default,data]' adlfs fsspec tqdm deltalake deltaray
envs
HOST=localhost
RAY_IMAGE=raytest
REDISPORT=6345
DASHBOARDPORT=8265
HEADNODEPORT=20001
REDISPASSWORD=yourpassword
NUM_WORKERS=4
NUM_CPU_WORKER=1
docker compose
version: "3"
services:
ray-head:
image: ${RAY_IMAGE}
ports:
- "${REDISPORT}:${REDISPORT}"
- "${DASHBOARDPORT}:${DASHBOARDPORT}"
- "${HEADNODEPORT}:${HEADNODEPORT}"
env_file:
- .env
command: bash -c "ray start --head --ray-client-server-port=${HEADNODEPORT} --dashboard-port=${DASHBOARDPORT} --port=${REDISPORT} --dashboard-host=0.0.0.0 --redis-password=${REDISPASSWORD} --block"
shm_size: 2g
deploy:
resources:
limits:
cpus: '1'
memory: '2g'
networks:
- ray_net
ray-worker:
image: ${RAY_IMAGE}
depends_on:
- ray-head
env_file:
- .env
command: bash -c "ray start --address=ray-head:${REDISPORT} --redis-password=${REDISPASSWORD} --num-cpus=${NUM_CPU_WORKER} --block"
shm_size: 2g
deploy:
mode: replicated
replicas: ${NUM_WORKERS}
resources:
limits:
cpus: ${NUM_CPU_WORKER}
memory: '2g'
networks:
- ray_net
networks:
ray_net:
ipam:
driver: default
config:
- subnet: 172.63.0.0/16
python script
import ray
from deltalake import DeltaTable
from ray.data import read_parquet
from ray.data.datasource import ParquetMetadataProvider
from fsspec import filesystem
ray.init(address='ray://localhost:20001')
access_key = "mykey"
account_name = "myaccountname"
storage_options = {
"account_name": account_name,
"account_key": access_key,
}
fs = filesystem("abfs", **storage_options)
partition_filters = [("customer_id", "in", ["abc"])]
dt = DeltaTable("abfs://mypath", storage_options=storage_options)
ds = read_parquet(
paths=dt.file_uris(partition_filters=partition_filters),
filesystem=fs,
columns=None,
parallelism=-1,
ray_remote_args=None,
tensor_column_schema=None,
meta_provider=ParquetMetadataProvider(),
)
df = ds.to_pandas()
print(df)
Issue Severity
High: It blocks me from completing my task.