[Datasets] Create custom dataset by grouping/merging existing blocks

My use case is following:

  1. Load dataset where each file is a block
  2. Based on contents of each block, create a new dataset where a new block may contain one or many old blocks (the grouping strategy for new blocks is determined by data contained in old blocks), hence new blocks may have overlapping data (which is ok to me)
  3. Produce final dataset by applying custom_function to each new block in new dataset.

Re 1: All clear, use custom datasource
Re 2: Here the plan is to use map_batches to gather data needed for repartitioning from input dataset, use this data to group existing block object refs, merge blocks based on grouping strategy derived above and create new dataset from these blocks.
Re 3: Similar to 1, use map_batches.

The questions are mostly regarding behavior of map_batches and specific DatasetAPI to use in 2, specifically:

  1. If I use map_batches without specifying batch_size, is it guaranteed that batch == block? If not, what do I do to make sure that blocks are kept intact and no repartitioning is taking place? Do I have to write custom task for this and apply it to each ObjectRef[Block] in dataset?

  2. How do I merge two blocks given two ObjectRef[Block] refs? Is there some efficient way without explicitly merging Block objects and creating new ref to this object, so I can avoid copying data? Or is there a way to tell Ray to handle two ObjectRef[Block] refs as a single ref? How do I create a dataset solely from a list of existing ObjectRef refs? For clarity, my blocks are pandas dataframes.


Re 1: Yes, if batch_size is None, each block will be a batch.
Re 2: Dataset doesn’t expose API for merging ObjfectRef of blocks. Can you reframe the requirements in terms of Dataset APIs? If you just want to create Dataset from Pandas DF, then it’s supported with ds = ray.data.from_pandas(pd_df_list).

@jianxiao, for more context, my dataset is a timestamp-sorted timeseries data, hence each block contains next period of data.

My goal is to apply map function to each block, the trick is that this function may need to access previous blocks before current block, depending on current block’s content/metadata (i.e. batch sliding window, per block + window).

If I load data with window in advance, I will have duplicated data stored in each block. Ideally I want to have block per file and somehow tell map function that it also needs a dependency on previous block/blocks in this dataset. Can you suggest any ideas on how to do it?

Another question is how can I pipeline this transformation? My data is larger than memory and I can’t load all of it at once.

Thank you @dirtyValera for providing more context.

Just for me to make sure I get the requirements (please correct if not accurate):

  • A data collection: N files, each file corresponding to a time period
  • The data collection is large and needs to be distributed and also pipelined
  • Processing one file may need to depend on previous file(s) (similar to autoregressive model?)

If without the second requirement, it can read all files into one DataFrame, and the processing will be similar to Simple AutoML for time series with Ray Core — Ray 3.0.0.dev0

If without the third requirement, one way to model this in Datasets would be using ds.groupby(date).map_group(UDF) (the groupkey is the time period), which will make sure each time period in one group, and the UDF to be applied exactly on the group.

However, to meet all these requirements, it needs higher flexibility than what Ray Datasets has for now.

If my understanding above about requirements is correct, one way to meet these requirements is using Ray DatasetPipeline to load data and then use Ray Core (i.e. ray.remote API) to run the time series analysis.

High-level idea:

import ray

pipe = ray.data.read_parquet("s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet").repartition(30).window(blocks_per_window=5)

max_parallel_tasks = 4

# The order of the model (like augoaggressive model).
# That is, X[i] = SIGMA(c[k] * X[i - k]), k = 1, 2, ..., p
# So to process block X[i], we need to read up to the previous
# p blocks for each task (except the first p tasks).
p = 5

# Keep max_parallel_tasks + p blocks in memory in order
# to make sure each in-flight task's depdendencies are available.
n = max_parallel_tasks + p

def time_series_task(block_window, i, j):
    return [len(block_window), i, j]

block_window = []
task_window = []

# Process a rolling window of time series tasks.
for i, ds in enumerate(pipe.iter_datasets()):
    for j, block_ref in enumerate(ds.to_pandas_refs()):
        if len(task_window) >= max_parallel_tasks:
            # Wait the earliest task to finish
            result = ray.get(task_window[0])
            # Consume the results:
            print("Result: ", result)

        # Release the earliest block if it's no longer needed
        if len(block_window) >= n:

        # Launch the next task.
        task_window.append(time_series_task.remote(block_window, i, j))

while len(task_window) > 0:
    result = ray.get(task_window[0])
    print("Result: ", result)


Result:  [1, 0, 0]
Result:  [2, 0, 1]
Result:  [3, 0, 2]
Result:  [4, 0, 3]
Result:  [5, 0, 4]
Result:  [6, 1, 0]
Result:  [7, 1, 1]
Result:  [8, 1, 2]
Result:  [9, 1, 3]
Result:  [9, 1, 4]
Result:  [9, 2, 0]
Result:  [9, 2, 1]
Result:  [9, 2, 2]
Result:  [9, 2, 3]
Result:  [9, 2, 4]
Result:  [9, 3, 0]
Result:  [9, 3, 1]
Result:  [9, 3, 2]
Result:  [9, 3, 3]
Result:  [9, 3, 4]
Result:  [9, 4, 0]
Result:  [9, 4, 1]
Result:  [9, 4, 2]
Result:  [9, 4, 3]
Result:  [9, 4, 4]
Result:  [9, 5, 0]
Result:  [9, 5, 1]
Result:  [9, 5, 2]
Result:  [9, 5, 3]
Result:  [9, 5, 4]

@jianxiao thanks, this looks like a possible direction.

Can you please explain how pipe.iter_datasets() and ds.to_pandas_refs() work internally in terms of data loading logic? Is it executed lazily i.e. when specific block’s data needs to be referenced in time_series_task or in some other way?

Can you please also give a high level logic on how Ray would manage memory for this pipeline and also suggest how to calculate desired/expected cluster memory (for example depending on pipeline’s throughput, to understand processing-time/throughput tradeoff) in terms of fitting both data reading tasks (from dataset) and time_series_task, considering that in my case time_series_task may require extra heap memory (which I can estimate based on block size) to do some extra processing? What about the case if I want to extend this pipeline to include extra steps, e.g. using pipeline’s output to stream into Ray Trainer/Ray Tune?

Thank you very much!

Looks a lot of questions:)

For the first part of questions:

  • DatasetPipeline essentially produces a stream of datasets, each is a window, so that’s what pipe.iter_datasets() providing
  • ds.t_pandas_refs() just gives out the underlying blocks of the dataset (of a window) in Pandas format
  • The refs are cheap to pass to remote tasks like time_series_task. It’s more accurate to say they are reference (i.e. non value) than calling them lazy in this case. That said, the execution of DatasetPipeline is lazy.
  • I encourage you to take a look at the documentation about DatasetPipeline: Key Concepts — Ray 3.0.0.dev0

For the second part of the questions:

  • You need to provision memory to support all the in-flight windows of the pipeline, as well as blocks holding in block_window used by in-flight tasks
  • If the remote tasks needs heap, the memory provision also needs to take account for that. (note the above memory for blocks/windows are the object store not heap memory)
  • Suppose you data processing output is Dataset, they should be possible to be used as the outputs to Ray Train/Tune. But I think this needs more examination of use case.
  • For memory part, I encourage you to take a look at this relevant documentation: Scheduling, Execution, and Memory Management — Ray 3.0.0.dev0

@jianxiao thanks a lot for your time and for well-thought answers! I have checked the links you provided, but still have few questions.

  1. How does the data loading happen (in terms of block prioritization with memory constraint) ? What do you mean by

You need to provision memory to support all the in-flight windows of the pipeline

? My understanding is, we are talking about 3 types of windows here: DatasetPipeline window (ds_pipe_window), block_window and task_window. ds_pipe_window and block_window are only logically separate, they share blocks, is this correct? If so, does it mean I need to provision enough object_store_memory for max(len(ds_pipe_window), len(block_window)) + heap memory for len(task_window) + whatever I need for downstream pipeline operations (i.e. to run Trainers/Tuners, which, correct me if I’m wrong, should only require heap memory for execution, and extra obect_store_memory for output?)

  1. From what I understand, each node’s memory is split between heap_memory and object_store_memory with 70/30 ratio, object_store_shared_memory is essentially a sum of all node’s object_store_memory, is it correct? If so, is it possible for me to change the 70/30 ratio? Is it done at cluster init or can be done dynamically?

  2. Is there a built-in Ray functionality to prioritise tasks (Dataset loading as well as custom tasks + Trainer/Tuner) if provisioned memory is not enough? If not, could you please share an idea how to implement prioritization? Simple use case is running different types of dependant tasks (i.e. DAG), each DAG operating on specific set of input data (input blocks). I want to finish certain DAG and pass it’s output to Trainer, while other DAGs may execute on leftover capacity. What is a general guideline for this case? How can I isolate capacity for Trainer and prioritize capacity for Dataset loading tasks + DAGs execution? If Ray doesn’t have a built-in functionality, can you please point me at possible implementation using Ray Core?

Thank you very much.

@jianxiao, here is more context on the higher level goal:

We have multiple timeseries (t1, t2, etc.) stored in S3 as a set of files each (i.e. no file has data for two timeseries). Each file represents a certain range for given timeseries, files are timestamp sorted.

We also have a Featurizer actor/task, which based on user configuration takes one or multiple timeseries (files/blocks) and produces an output timeseries representing a feature defined by this Featurizer (this is what was described as time_series_task in your code above). On lower level, it operates on ts blocks and produces similar ts blocks, and can be data parallel. One Feateruizer can depend on another Feateruizer, meaning that if we want to calculate Featurizers f1 and f2 and f2 depends on f1, we don’t want to recalculate f1 for given block.

The goal is to have a customisable pipeline where a user defines a set of timeseries it wants to operate, their range (shared) and a set of Featurizers for a given set of timeseries to produce a feature matrix, which should be synced (i.e. Point-in-Time join/merge) and streamed into Trainer to produce a validated model with all necessary metrics.

So the high level pipeline should have following steps:

  • stream S3 files to cluster memory
  • perform all Featurizers caclucaltions (which can be represented by DAG or a set of DAGs)
  • sync each Featurizers output based on common timestamps to give us streaming feature matrix (Point-in-Time join)
  • stream the feature matrix to Trainer
  • run Trainer as a part of pipeline without blocking previous stages
  • collect model and metrics once all input data is processed

The pipeline’s throughput should also be configurable in terms of CPU/memory/other resources usage, considering we can give a rough estimate of resource usage for each of pipelines tasks (data loading, featurizing, training, etc. on per block basis). Ideally we want to give user an ability to decide a tradeoff between:

  • input data size (i.e. number of features/Featurizers, timeseries training range)
  • total resource usage (which maps to $ budget)
  • total time to execute pipeline

We have already tested Kuberay which in conjunction with Kuberay’s autosclaer and Ray Core gives an amazing flexibility similar to Kubernetes-native Actors, it can spawn ray nodes as pods and give us application level scheduling which is a huge plus. In conjunction with Kubernetes Cluster Autosclaer it can give the pipeline described above an option to be elastic in user defined resource range.

Any pointers on above implementation are highly appreciated.

It looks “window” is overloaded here :slight_smile:

What I mean here is the window of blocks (so task_window is no relevant, as it’s just a bunch of references, which are very cheap to hold). And you’re right the object store should be large enough to hold all blocks that are referenced by block_window and blocks_per_window (used in window(blocks_per_window=...).

You can configure the object store memory when you call ray.init (it’s’ an arg, Ray Core API — Ray 3.0.0.dev0).

There is a concept called Placement Group (Key Concepts — Ray 3.0.0.dev0) in Ray, with which you can reserve resources and then later use it for tasks you want. For example, in Datasets, when it’s running with Ray Tune, we have this arrangement: Scheduling, Execution, and Memory Management — Ray 3.0.0.dev0.