I have a python file named runner with the following codes.
import ray
ray.init(address='ray://192.10.14.158:10001', runtime_env = {"working_dir": "./"})
d = ray.data.read_csv("data.csv")
d = d.repartition(10)
print(d.num_blocks())
d.show()
I ran the following command
python3 runner.py
And then got the error:
Traceback (most recent call last):
File "/app/psi_test/test.py", line 10, in <module>
d.show()
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2189, in show
for row in self.take(limit):
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2147, in take
for row in self.iter_rows():
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2789, in iter_rows
dataset_format = self.dataset_format()
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 4239, in dataset_format
schema = self.schema(fetch_if_missing=True)
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2247, in schema
return self._plan.schema(fetch_if_missing=fetch_if_missing)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 360, in schema
self.execute()
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 539, in execute
blocks = execute_to_legacy_block_list(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 84, in execute_to_legacy_block_list
bundles = executor.execute(dag, initial_stats=stats)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/bulk_executor.py", line 82, in execute
return execute_recursive(dag)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/bulk_executor.py", line 62, in execute_recursive
op.inputs_done()
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/all_to_all_operator.py", line 57, in inputs_done
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 244, in bulk_fn
block_list, stats_dict = fn(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/stage_impl.py", line 76, in do_fast_repartition
return fast_repartition(blocks, num_blocks)
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/fast_repartition.py", line 34, in fast_repartition
splits = wrapped_ds.split_at_indices(indices)
File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 1366, in split_at_indices
blocks, metadata = _split_at_indices(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/split.py", line 283, in _split_at_indices
] = _split_all_blocks(
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/split.py", line 208, in _split_all_blocks
trace_deallocation(b, "split._split_all_blocks")
File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/memory_tracing.py", line 48, in trace_deallocation
ray._private.internal_api.free(ref, local_only=False)
File "/usr/local/lib/python3.10/dist-packages/ray/_private/internal_api.py", line 211, in free
worker.check_connected()
File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 549, in check_connected
raise RaySystemError(
ray.exceptions.RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.
But if I modify the code as below:
import ray
ray.init(address='auto')
d = ray.data.read_csv("data.csv")
d = d.repartition(10)
print(d.num_blocks())
d.show()
And run the command:
ray job submit --address=192.10.14.158:6379 --working-dir ./ – python3 runner.py
It successfully finished the job.
If someone can help me figure out the problem? Thanks a lot.
My python version is 3.10.6 and ray version is 2.3.1.