### Description
With datasets streaming execution, when processing large-scale …images, the processing speed of the upstream and downstream operators does not match. The size of the operator queue is not limited and will continue to grow. When the system memory is insufficient, it will be spilled to the disk, resulting in insufficient disk space.
In this execution mode, is it possible to set the size of the queue and support back pressure?
### Use case
## Reproduction script
```python
import ray
from ray.data.read_api import read_datasource
from ray.data.datasource.datasource import Datasource, Reader, ReadTask
from typing import Any, Dict, List, Optional, Union
from ray.data.block import Block, BlockMetadata
import time
import numpy as np
assert (
ray.__version__ >= "2.3.0"
), f"The version of ray must be greater than 2.3.0, the current version is {ray.__version__}"
ray.init()
ctx = ray.data.context.DatasetContext.get_current()
ctx.use_streaming_executor = True
ctx.execution_options.preserve_order = False
class SampleDatasourceReader(Reader):
def __init__(self, sample_count):
self.sample_count = sample_count
def estimate_inmemory_data_size(self) -> Optional[int]:
"""Return an estimate of the in-memory data size, or None if unknown.
Note that the in-memory data size may be larger than the on-disk data size.
"""
return None
def _read(self, idx) -> List[Block]:
blocks = []
block = [np.random.randn(3, 1024, 1024)]
blocks.append(block)
time.sleep(0.5)
return blocks
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
read_tasks: List[ReadTask] = []
for i in range(self.sample_count):
metadata = BlockMetadata(
num_rows=1,
size_bytes=10, # guess
schema=None,
input_files=None,
exec_stats=None,
)
read_task = ReadTask(lambda idx=i: self._read(idx), metadata)
read_tasks.append(read_task)
return read_tasks
class SampleDatasource(Datasource):
def create_reader(self, sample_count, **read_args) -> "Reader[T]":
return SampleDatasourceReader(sample_count)
def read_sample(
sample_count: int,
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
**read_args,
):
return read_datasource(
SampleDatasource(),
parallelism=parallelism,
ray_remote_args=ray_remote_args,
sample_count=sample_count,
**read_args,
)
class TestSlow:
def __call__(self, df):
time.sleep(10)
return df
class TestFast:
def __call__(self, df):
time.sleep(0.1)
return df
ds = read_sample(10000, 2)
print(ds)
pipe = ds.map_batches(
TestFast,
batch_size=1,
num_cpus=0.1,
compute=ray.data.ActorPoolStrategy(10, 10),
).map_batches(
TestSlow,
batch_size=1,
num_cpus=0.5,
compute=ray.data.ActorPoolStrategy(2, 2),
)
for batch in pipe.iter_batches(batch_size=1):
...
```
## Execution log
No space left on device
```text
2023-03-23 16:38:08,475 INFO worker.py:1550 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8266
Dataset(num_blocks=10000, num_rows=10000, schema=<class 'numpy.ndarray'>)
2023-03-23 16:38:39,222 INFO streaming_executor.py:65 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[read] -> ActorPoolMapOperator[MapBatches(TestFast)] -> ActorPoolMapOperator[MapBatches(TestSlow)]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [00:34<?, ?it/s(raylet) Spilled 2356 MiB, 10025 objects, write throughput 194 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.1:01:33, 2.68it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [00:53<?, ?it/s(raylet) Spilled 4420 MiB, 10111 objects, write throughput 217 MiB/s. | 155/10000 [00:53<28:04, 5.85it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [01:09<?, ?it/s(raylet) Spilled 8236 MiB, 10270 objects, write throughput 260 MiB/s. | 241/10000 [01:08<27:44, 5.86it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [01:45<?, ?it/s(raylet) Spilled 16612 MiB, 10619 objects, write throughput 298 MiB/s. | 453/10000 [01:45<26:02, 6.11it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [02:52<?, ?it/s(raylet) Spilled 32812 MiB, 11294 objects, write throughput 317 MiB/s. | 852/10000 [02:52<20:59, 7.26it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [02:58<?, ?it/s](raylet) [2023-03-23 16:41:37,639 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 12532576256; capacity: 250685575168. Object creation will fail if spilling is required.[02:58<29:27, 5.16it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:08<?, ?it/s(raylet) [2023-03-23 16:41:47,645 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 11353731072; capacity: 250685575168. Object creation will fail if spilling is required. [03:08<19:03, 7.92it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:18<?, ?it/s(raylet) [2023-03-23 16:41:57,689 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 9104691200; capacity: 250685575168. Object creation will fail if spilling is required.0 [03:18<25:38, 5.85it/s]
Resource usage vs limits: 6.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:28<?, ?it/s(raylet) [2023-03-23 16:42:07,768 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 6550011904; capacity: 250685575168. Object creation will fail if spilling is required.0 [03:28<27:23, 5.44it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:36<?, ?it/s(raylet) [2023-03-23 16:42:17,818 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 4248203264; capacity: 250685575168. Object creation will fail if spilling is required.0 [03:36<24:43, 6.00it/s]
2023-03-23 16:42:18,492 WARNING worker.py:1866 -- WARNING: 46 PYTHON worker processes have been started on node: 4615ec90952904b5c4914bc88c5990fc436b55199e1c7a0c676bbf95 with address: 127.0.0.1. This could be a result of using a large number of actors, or due to tasks blocked in ray.get() calls (see https://github.com/ray-project/ray/issues/3644 for some discussion of workarounds).
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:49<?, ?it/s(raylet) [2023-03-23 16:42:27,848 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 3362476032; capacity: 250685575168. Object creation will fail if spilling is required.[03:49<1:16:43, 1.93it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [03:58<?, ?it/s(raylet) [2023-03-23 16:42:37,915 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 3416354816; capacity: 250685575168. Object creation will fail if spilling is required.0 [03:58<36:53, 4.00it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [04:08<?, ?it/s(raylet) [2023-03-23 16:42:47,949 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 2180833280; capacity: 250685575168. Object creation will fail if spilling is required.0 [04:08<43:02, 3.42it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [04:18<?, ?it/s(raylet) [2023-03-23 16:42:57,984 E 8163 4074926] (raylet) file_system_monitor.cc:105: /tmp/ray/session_2023-03-23_16-38-02_026919_8133 is over 95% full, available space: 1429213184; capacity: 250685575168. Object creation will fail if spilling is required.0 [04:18<34:09, 4.28it/s]
Resource usage vs limits: 8.0/8.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/1 [04:24<?, ?it/s2023-03-23 16:43:04,653ueWARNING worker.py:1866 -- Traceback (most recent call last): | 1258/10000 [04:24<30:10, 4.83it/s]
File "python/ray/_raylet.pyx", line 1280, in ray._raylet.spill_objects_handler | 1258/10000 [04:24<26:55, 5.41it/s]
File "python/ray/_raylet.pyx", line 1283, in ray._raylet.spill_objects_handler | 47/10000 [04:25<14:15:17, 5.16s/it]
File "/Users/leyi/miniconda3/envs/python37/lib/python3.7/site-packages/ray/_private/external_storage.py", line 668, in spill_objects16s/it]
return _external_storage.spill_objects(object_refs, owner_addresses)
File "/Users/leyi/miniconda3/envs/python37/lib/python3.7/site-packages/ray/_private/external_storage.py", line 305, in spill_objects
return self._write_multiple_objects(f, object_refs, owner_addresses, url)
File "/Users/leyi/miniconda3/envs/python37/lib/python3.7/site-packages/ray/_private/external_storage.py", line 149, in _write_multiple_objects
written_bytes = f.write(payload)
OSError: [Errno 28] No space left on device
An unexpected internal error occurred while the IO worker was spilling objects: [Errno 28] No space left on device
```