Ray Data streaming not streaming smoothly

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hey Team,
Would be great to get some feedback on our use case!

We are currently trying to make use of data streaming with training our ml models. We definitely can see the streaming behavior, however, it still is very slow, and not achieving high CPU or GPU utilization.

We are currently using Ray 2.4 and training with Tensorflow 2.7, converting using the iter_tf_batches.

We have experimented with the DatasetContext.get_current().execution_options.resource_limits.object_store_memory setting to a range of values, 1MB, 2GB, 4Gb, 10GB, and 20GB but no matter what we are not getting the expected “out of the box” streaming feeling.

Some things to note about our dataset, it is made up of Parquet files with JPEG images inside them. There are about 200000 files, with the avg size being 60MB, and contain between 100-400 samples each. The preprocessing is quite heavy computationally and heavily increases a samples size.

The first batch of data that is fed into training, has the following stats:

Stage 1 ReadParquet: 66/66 blocks executed in 534.99s
* Remote wall time: 1.62s min, 5.27s max, 2.95s mean, 195.01s total
* Remote cpu time: 1.02s min, 2.03s max, 1.41s mean, 93.19s total
* Peak heap memory usage (MiB): 5460.5 min, 39400.53 max, 22678 mean
* Output num rows: 2424 min, 4325 max, 3750 mean, 247521 total
* Output size bytes: 350239343 min, 607671869 max, 551557776 mean, 36402813262 total
* Tasks per node: 66 min, 66 max, 66 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 36402813262, 'obj_store_mem_freed': 245057435863, 'obj_store_mem_peak': 245057435863}

Stage 2 MapBatches(preprocessor): 14/14 blocks executed in 534.99s
* Remote wall time: 13.22s min, 26.6s max, 22.72s mean, 318.08s total
* Remote cpu time: 17.13s min, 40.11s max, 30.9s mean, 432.67s total
* Peak heap memory usage (MiB): 1221.54 min, 8867.44 max, 5274 mean
* Output num rows: 149 min, 290 max, 279 mean, 3919 total
* Output size bytes: 280362572 min, 545672120 max, 526721438 mean, 7374100132 total
* Tasks per node: 14 min, 14 max, 14 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 7374100132, 'obj_store_mem_freed': 577999233, 'obj_store_mem_peak': 7930077107}

Dataset iterator time breakdown:
* Total time user code is blocked: 939.52ms
* Num blocks local: 0
* Num blocks remote: 0
* Num blocks unknown location: 0
* Batch iteration time breakdown (summed across prefetch threads):
    * In ray.get(): 10.47ms min, 12.82ms max, 11.67ms avg, 46.66ms total
    * In batch creation: 870.04ms min, 870.04ms max, 870.04ms avg, 870.04ms total
    * In batch formatting: 15.55ms min, 15.55ms max, 15.55ms avg, 15.55ms total

The CPU utilization has been very low:

Note the green line is the GPU worker, only achieving a max of 4% utilization.

As for the GPU utilization, it looks as follows:

Thnx!

@Boruch_Chalk Thanks for sharing this post. Might help us if we can see the code you using
for using the latest 2.4 Streaming APIs

@ericl @chengsu any comments on this.

Thanks for sharing this! We’re very interested in solving any streaming issues you have. A couple asks from my side:

  • Could you provide some sample code (could be using dummy/synthetic data for privacy)?
  • Some screenshots of the progress bar (with RAY_DATA_VERBOSE_PROGRESS=1) would also be helpful. This shows which resource is the bottleneck for the streaming workload.

We’ve also addressed a very similar streaming bug in master due to incorrect resource tracking, so it is probably worth also trying out the nightly wheels.

Hey,

I have updated the Ray version to use the nightly, and have noticed some great improvements! It is streaming nicely now.

I have noticed though that it is very sensitive to Object Store memory size, aside from the setting of the DatasetContext.get_current().execution_options.resource_limits.object_store_memory, where when the object store is larger ie set to 50GB instead of 20GB, it is more likely to fill up and OOM.

Also, when the parquets are very small ie 10 samples per parquet, it OOMs as well, as hundreds of pre-processing tasks are spun up. [4GB Steam Size, 20GB Object Store]

Aside from this, we have written our own Datasource, with a custom Metadata Provider, and we notice issues when using this, ie the non-streaming-like behavior. However, when we revert to using ray.data.read_parquet, then there are no issues. I assume it might be related to how the Metadata Provider creates blocks.

Will be quite difficult to create the synthetic data and mask the preprocessing, but if I get a chance I get try to put something together.

One thing to watch out for, which your symptoms may match, is making sure the custom datasource is populating the size_bytes field of the block metadata correctly. The streaming backend relies on the size reported in the block metadata to manage memory; setting it too low could result in “non-streaming-like” behavior as it degrades to bulk execution.

Thnx for that!

I have adjusted the size_bytes that is used in the ReadTask, and heavily multiplied by a compression factor, taking into account that each sample increases in size during preprocessing. This is due to factors such as, images being stored with JPEG compression within the Parquet, which is in addition to the Parquet compression, in addition to the heavy preprocessing which adds many more large tensors to each sample.

With this in mind, I have achieved somewhat stable training, training for a few hours with OK GPU utilization.
Which is an average of about 60% utilization.

However, after about 2hrs the training till fail due to OOM kills, this happened straight after Ray span up quite a few new MapBatches(preprocessor) tasks.

In relation to this, how does the streaming, decide to spin up more actors? Is it just using the size_bytes or does it also measure during each stage in the pipeline?

Would you have other suggestions for achieving a still smoother stream?

Thnx!

By OOM, do you mean an actual worker crash and not just disk spilling? In that case, I would increase the resource allocation per actor (e.g., increase num_cpus=4 or 8, so that no matter how many actors are launched, there isn’t an OOM as each has adequate heap memory). The streaming system mainly manages the object store memory to avoid slowdowns like spilling.

For launching more actors, this is done when there is a backlog of work for the operator, measured at each stage. If this is fluctuating undesirably during a run and you don’t need autoscaling, it would probably be better to fix the actor pool size so that no new actors are launched during the run.

Hey,

Following up on this.

By OOM I had meant that workers were crashing, followed by the Trainer crashing.

I have created an Actor pool and aligned the num_cpus requests for the reading and preprocessing so that the tasks merge. This merge creates a much more stable flow, with better-managed memory.

However, it is streaming very slowly now, so while more managed I can’t get good GPU utilization still. I have tried adding prefetch it does not appear to help, and results in large amounts of spilling, a lot more than it should. I have requested a prefetch size of 200, with a batch size of 64, and a stream size of 4GB, so it should not be spilling that much.

(raylet, ip=10.250.174.57) Spilled 3164 MiB, 28000 objects, write throughput 1867 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
(raylet, ip=10.250.174.57) Spilled 4269 MiB, 44000 objects, write throughput 1781 MiB/s.
(raylet, ip=10.250.174.57) Spilled 9009 MiB, 88000 objects, write throughput 1344 MiB/s.

Follow-up question on the streaming flow, does it wait for the entire allocated object store to fill up before passing on the next batch to the iter_batches or that is happening as soon as there are enough rows to fulfill a batch? If it does pass on the batch as soon as there is enough data, does it do this even when this condition is fulfilled halfway through a block? Meaning does setting the batch size small on the map_batches help?

However, it is streaming very slowly now, so while more managed I can’t get good GPU utilization still.

How’s the CPU and network utilization? I would guess there’s some resource bottleneck now, such as not enough tasks reading data in parallel. Merging the read and preprocessing could cause this if the read needed more parallelism than the preprocessing. Increasing the prefetch also wouldn’t help in this case.

Follow-up question on the streaming flow, does it wait for the entire allocated object store to fill up before passing on the next batch to the iter_batches or that is happening as soon as there are enough rows to fulfill a batch?

It should be the latter.

If it does pass on the batch as soon as there is enough data, does it do this even when this condition is fulfilled halfway through a block? Meaning does setting the batch size small on the map_batches help?

It has to fetch at least one block before data can be returned. So say you set prefetch_batches=10, and the batch size is 64, and each block is 1000 rows, then one block would be prefetched at a time. If the block is very small, then a lot more would get prefetched.