Ray Streaming is timing out while trying to get next window

This has been painful to diagnose but I think I have cornered an issue:

My TorchTrainer is reliably crashing on the same step which corresponds to the window it is on for streaming big data.

I get errors like this after crunching through the first window:

Warning: reader on shard 18 of the pipeline has been blocked more than 2560.0s waiting for other readers to catch up. All pipeline shards must be read from concurrently.

and later

 [Rank 23] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=2006, OpType=BROADCAST, Timeout(ms)=5400000) ran for 5400048 milliseconds before timing out

I have inferred the error from below, which suggests I have 10 windows. 10 windows * (batch_size * step) = roughly my training data set.

e[2me[36m(TorchTrainer pid=158165, ip=10.0.0.8)e[0m Created DatasetPipeline with 10 windows: 134.91GiB min, 184.61GiB max, 179.56GiB mean

e[2me[36m(TorchTrainer pid=158165, ip=10.0.0.8)e[0m Blocks per window: 588 min, 738 max, 715 mean

e[2me[36m(TorchTrainer pid=158165, ip=10.0.0.8)e[0m ✔️ This pipeline's per-window parallelism is high enough to fully utilize the cluster.

e[2me[36m(TorchTrainer pid=158165, ip=10.0.0.8)e[0m ✔️ This pipeline's windows likely fit in object store memory without spilling.

How can I use Ray to ensure that all workers get their shard of data?