Map Dataset with Rolling Window

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I would like to map a function to a dataset with a rolling window. For example I can do the following to get a rolling window over a dataset but I can’t apply a function to set the row’s value.
I’ve looked at the map and map_batches methods inside of Dataset but can’t tell if that’s the best way to go about this.

import ray
from collections import deque

def rolling_window(ds, window_size):
    if window_size % 2 == 0:
        window_size += 1

    if not window_size > 1:
        raise ValueError("Window size must be greater than 1")

    pad = window_size // 2

    # Use a deque to handle the window of elements with max length equal to the window size
    window = deque(maxlen=window_size)

    first = None
    last = None
    # Start generating the batches
    for i, element in enumerate(ds.iter_rows()):
        if first is None:
            first = last = element
            for _ in range(0, pad):
                window.appendleft(first)
        window.append(element)
        last = element  # Update last element on each iteration
        if i < (pad):
            continue

        yield list(window)

    # Finally, pad and yield the remaining batches
    for _ in range(0, pad):
        window.append(last)
        yield list(window)

ds = ray.data.range(10)

for batch in rolling_window(ds, 3):
    print(batch)

I would like to map a function to a dataset with a rolling window.

Hey @Josiah_Reeves, I don’t think there’s a clean way to achieve this with the Ray Data API. If you’re looking for a distributed solution, you could probably implement something with Ray Core (IIRC there are algorithms for parallel window computation).