Debugging Ray Data out of memory errors

I have a csv file of size 7.2 GB (~100 million rows) on which I need to perform a dataset featurization task. I am using a head-node of memory 16 GB and performing a MapBatch->Filter->Write workflow.

I am observing a log like this:

2024-02-08 15:49:23,988 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> TaskPoolMapOperator[MapBatches(RayFeaturizer)->Filter(<lambda>)->Write]
2024-02-08 15:49:23,988 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-08 15:49:23,988 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]
Running: 0.0/100.0 CPU, 0.0/0.0 GPU, 0.0 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [00:00<?, ?it/s]
Running: 1.0/100.0 CPU, 0.0/0.0 GPU, 558.49 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [00:00<?, ?it/s]

...

Running: 51.0/100.0 CPU, 0.0/0.0 GPU, 687.19 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [00:07<?, ?it/s]

(MapBatches(RayFeaturizer)->Filter(<lambda>)->Write pid=3474, ip=172.31.33.222)
...
Running: 100.0/100.0 CPU, 0.0/0.0 GPU, 687.19 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [1:30:07<?, ?it/s]

which continues for more time (~1h 30 min) and then the job fails due to out of memory error. I tried fixing it by increasing the memory of head node to 32 GB but it still fails. The task is a simple featurization task which I think should not take much memory. I am not sure what causing the OOM error. Could the read-csv process be optimized to use less-memory?

When I tested on a dataset of smaller size (10 million records), the featurization process completed successfully. The log observed was:

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]
Running: 0.0/100.0 CPU, 0.0/0.0 GPU, 0.0 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [00:00<?, ?it/s]
Running: 1.0/100.0 CPU, 0.0/0.0 GPU, 558.49 MiB/28.7 GiB object_store_memory:   0%|          | 0/1 [00:00<?, ?it/s]
...
Running: 164.0/164.0 CPU, 0.0/0.0 GPU, 616.13 MiB/47.32 GiB object_store_memory:   0%|          | 0/1 [07:04<?, ?it/s]

(autoscaler +7m9s) Resized to 164 CPUs.

Running: 164.0/164.0 CPU, 0.0/0.0 GPU, 616.13 MiB/47.32 GiB object_store_memory:   0%|          | 0/1 [07:06<?, ?it/s]

# NOTE the transition here from 0/1 to 0/250

Running: 164.0/164.0 CPU, 0.0/0.0 GPU, 616.13 MiB/47.32 GiB object_store_memory:   0%|          | 0/250 [07:22<?, ?it/s]
]
Running: 164.0/164.0 CPU, 0.0/0.0 GPU, 616.13 MiB/47.32 GiB object_store_memory:   0%|          | 1/250 [07:22<30:34:55, 442.15s/it]
Running: 163.0/164.0 CPU, 0.0/0.0 GPU, 613.6 MiB/47.32 GiB object_store_memory:   0%|          | 1/250 [07:22<30:34:55, 442.15s/it]
...
# Job completes successfully.

The question which I have here is what does the transition at the 7th min from 0/1 to 0/250 as observed in the log implies? Does it mean that read is completed and dataset-write begins?