Repartition error

I have a python file named runner with the following codes.

import ray

ray.init(address='ray://192.10.14.158:10001', runtime_env = {"working_dir": "./"})

d = ray.data.read_csv("data.csv")

d = d.repartition(10)

print(d.num_blocks())
d.show()

I ran the following command

python3 runner.py

And then got the error:

Traceback (most recent call last):
  File "/app/psi_test/test.py", line 10, in <module>
    d.show()
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2189, in show
    for row in self.take(limit):
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2147, in take
    for row in self.iter_rows():
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2789, in iter_rows
    dataset_format = self.dataset_format()
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 4239, in dataset_format
    schema = self.schema(fetch_if_missing=True)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 2247, in schema
    return self._plan.schema(fetch_if_missing=fetch_if_missing)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 360, in schema
    self.execute()
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/plan.py", line 539, in execute
    blocks = execute_to_legacy_block_list(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 84, in execute_to_legacy_block_list
    bundles = executor.execute(dag, initial_stats=stats)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/bulk_executor.py", line 82, in execute
    return execute_recursive(dag)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/bulk_executor.py", line 62, in execute_recursive
    op.inputs_done()
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/all_to_all_operator.py", line 57, in inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/legacy_compat.py", line 244, in bulk_fn
    block_list, stats_dict = fn(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/stage_impl.py", line 76, in do_fast_repartition
    return fast_repartition(blocks, num_blocks)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/fast_repartition.py", line 34, in fast_repartition
    splits = wrapped_ds.split_at_indices(indices)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/dataset.py", line 1366, in split_at_indices
    blocks, metadata = _split_at_indices(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/split.py", line 283, in _split_at_indices
    ] = _split_all_blocks(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/split.py", line 208, in _split_all_blocks
    trace_deallocation(b, "split._split_all_blocks")
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/memory_tracing.py", line 48, in trace_deallocation
    ray._private.internal_api.free(ref, local_only=False)
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/internal_api.py", line 211, in free
    worker.check_connected()
  File "/usr/local/lib/python3.10/dist-packages/ray/_private/worker.py", line 549, in check_connected
    raise RaySystemError(
ray.exceptions.RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.

But if I modify the code as below:

import ray

ray.init(address='auto')

d = ray.data.read_csv("data.csv")

d = d.repartition(10)

print(d.num_blocks())
d.show()

And run the command:

ray job submit --address=192.10.14.158:6379 --working-dir ./ – python3 runner.py

It successfully finished the job.

If someone can help me figure out the problem? Thanks a lot.

My python version is 3.10.6 and ray version is 2.3.1.

@veryhannibal

Can you set the environment variable to export RAY_ADDRESS=192.10.14.158:10001

ray.init(address='auto')

d = ray.data.read_csv("data.csv")

d = d.repartition(10)

print(d.num_blocks())
d.show()
ray job submit  --working-dir  ./ python3./ runner.py

@veryhannibal Also, might want to check if with netstat -an | grep LISTEN on the headnode.

Thanks for your feedback.
I have tried your suggestion, the same error occuried.

My project do not allow us to run program by using command "ray job submit ".
We write code in jupyter-notebook, so we must use ray.init(), and specify the head node address when we call ray.init().
Every time I call repartition directly after read_csv, this error occurs, but if I run select_columns before the repartition, it runs successfully.
For example, the following code can run successfully if I add select_columns before repartition.

import ray

ray.init(address="auto")

data = ray.data.read_csv("/data/raydata/leader.csv")
print(data)
data._lazy = False
data = data.select_columns(cols=["example_id", "col0", "col1"])
data = data.repartition(64)

data.show()


Hi, I modified my code, I did not use ray.data.read_csv. This time it ran successfully.
New code is here:

import ray
import pandas as pd

@ray.remote
def f():
    df = pd.read_csv("/data/raydata/leader.csv")
    return df

data = ray.data.from_pandas_refs([f.remote()])
data._lazy = False
data = data.repartition(64)

data.show()

So I guess if there is something wrong with the return value of ray.data.read_csv().