Ray cannot handle out-of-disk space even when other nodes have available disk space

I provisioned a cluster with enough collective disk space to hold ~ 200 GBs of spilled objects spread over 20 machines.

My dataset is ~ 150 GB.

After about 2/3rds of the dataset is loaded, I get the following error:

  File "cluster_scripts/video_tar2ray.py", line 49, in <module>
    total_items = items.count()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/dataset.py", line 1881, in count
    [get_num_rows.remote(block) for block in self.get_internal_block_refs()]
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/dataset.py", line 3388, in get_internal_block_refs
    return self._plan.execute().get_blocks()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/lazy_block_list.py", line 278, in get_blocks
    blocks, _ = self._get_blocks_with_metadata()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/lazy_block_list.py", line 324, in _get_blocks_with_metadata
    metadata = read_progress_bar.fetch_until_complete(list(unique_meta_refs))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/progress_bar.py", line 75, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/_private/worker.py", line 2280, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError: ray::_execute_read_task() (pid=234, ip=
ray.exceptions.OutOfDiskError: Local disk is full

While this might be true, the error is kind of surprising because there are other nodes on the cluster that have extra disk space. Can Ray not re-arrange the objects to utilize the remaining disk space?

Semi-followup-to: Can Ray handle no space being left on a device?

From @Stephanie_Wang:

I don’t think we do any disk-specific load-balancing, so the amount of disk used on a node will depend on how many tasks get scheduled to that node according to CPU, etc. we could possibly extend the default scheduling policy to be disk-aware, but not sure it will help this case.

We need to first get an idea of what the tasks look like (how many, how much data does each return) and how they were scheduled in this case, so that we can figure out what the scheduling policy should look like vs. what it’s currently doing.

Could you share more about the workload?