Why is the Ray scheduler showing this behaviour in my computation?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task: I’m attempting to understand how the scheduler works, so that I can use it for a heavy operational use case where pedictability and stability is essential.

Summary

While running some tests where a Ray cluster reads a set of data from a custom datasource (implemented using ray.data.Datasource), the scheduler is showing some behaviour that’s unexpected to me. I’m trying to understand what’s going on here so I can move on to use this in a production setting with a strong level of confidence on what’s going on.

What does the computation look like?

@ray.remote
def execute_my_test(
    block_msg_count: int,
    blocks_per_window: int,
) -> int:
    # MyCustomDatasource will return a Dataset with a number of blocks determined by block_size, which represents the number of records per block. 
    # Each Block is created by a ReadTask that will read block_size number of records from my datasource 
    return (
        ray.data.read_datasource(
            datasource=MyCustomDatasource(),
            block_size=block_msg_count,
        )
        .window(blocks_per_window=blocks_per_window)
        .count()
    )

object_ref = ray.get(
    execute_my_test.remote(
        block_msg_count=8000,
        blocks_per_window=192,
    )
)

What does the cluster look like?

  • 32 worker nodes
  • 3 CPUs per node assigned to Ray
  • 6 GB of memory per node assigned to ray
  • No num_cpus have explicitly been set on any .remote() calls, so I’m expecting task parallelism to fall back to Ray’s auto-detection based on the number of available CPUs described in the source. Which would be 96?
  • The Datasource returns a dataset that uses a Block format based on a list of Python dicts.

What is the unexpected behaviour?

1. Computation runs out of memory way sooner than I’d expect.

With an average record size of 5Kb per record, I’d expect to be able to fit up to 512.000 records in a block:

  • 5Kb X 512.0000 = 2.500 MB per block.
  • 3 concurrent tasks per worker X 2.500 MB per block = Up to 7.500 MB of memory usage per worker

Since 7.500 MB > 6.000 MB available heap space, the computation should go out of memory.

What I see is that my computation already hits that OOM limit at 64.000 records per block:

  • 5Kb X 64.000 = ~300 MB per block
  • 3 concurrent tasks per worker X 300 MB per block = Up to 900 MB of memory usage per worker

Which is totally unexpected because that cap of 900 MB memory usage is way lower than the 6GB of available memory per worker. Now, using ray memory I can tell that Ray is in fact attempting to use more than 6GB of worker memory.

2. Computation speeds up, even when it’s assigned more blocks per window than the cluster has CPUs available

Since my cluster has a fixed number of 96 CPUs (32 x 3 cpus per worker) I’d expect Ray to effectively be able to run 96 tasks in parallel. So I notice a speed-up when increasing the blocks_per_window argument to 96.

What’s unexpected though is that the computation actually continues speeding up when blocks_per_window is increased way past the number of available cpus in the cluster, up until about 512 blocks_per_window.

Conclusion

Both of those behaviours lead me to believe that Ray is running way more tasks (on Blocks) in parallel than the 96 concurrent tasks I’m thinking it’s running. What’s going on here? My goal is to figure out how the scheduler decides on Task parallelism so that I can run my computations with a certain level of robustness in production.