Correctly sizing preprocessing Actor in Ray data

How severe does this issue affect your experience of using Ray?.

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Description of the issue.
Hello, I’m currently trying to setup a training pipeline using Ray Data and Ray Train.
The setup is the following:

  • The preprocessing part is developed using Torch and OnnxScript
  • The model training part is done in Torch
  • We use read_parquet followed by map_batches to run the preprocessing
  • We use iter_torch_batches to feed the model training loop

My preprocessing is done in two part, one in torch, and one in ONNX, for the later, I need to instantiate an onnxruntime session on the ray workers. I tried by specifying a function in map_batches, but it uses a ray tasks, and re-create the onnx session for every batch, making the computation slow (creating an onnxruntime session is costly).

I then switched and tried to specify a class in map_batches in order to use ray actors, but I can’t manage to get even the same performances as the ray tasks. When trying this, I need to specify manually the concurrency/number of cpus, and it looks like I can’t get an adequate setup.
But since the actor instantiate the session only once at startup, we should be able to reach better computation time using this (and it is stated in the documentation to use ray actors when doing stateful computations).

Are there any guidelines on how to size ray actors when using them in map_batches ?
How are tasks sized when using tasks in map_batches ? It looks like ray automatically sizes the worker used for the tasks, and that it works pretty well. I saw in the documentation that it starts one worker per CPU available, buy trying this setup with actors didn’t work well.

Hi @thenri , could you share the high level Ray Data + Ray train code you are using? Also, what version of ray, torch, ONNX, etc?

how to size ray actors when using them in map_batches ?

could you clarify what you mean by “sizing ray actors” here? do you mean setting num_gpus in the map_batches() call?

Hello @sjl , thanks a lot for your answer.

I’m using the following versions:

  • ray==2.23.0
  • torch==2.3.1
  • onnx==1.16.1
  • onnxruntime==1.18.0
  • onnxscript==0.1.0.dev20240624

Now for the high level code:

  • I have a preprocessing class that looks like the following:
import onnx
import onnxruntime as ort


Class PreprocessingPipeline:
    def __init__(self, onnx_path: str):
        self.model_proto = onnx.load(onnx_path)
        self.sess_options = ort.SessionOptions()
        self.sess_options.intra_op_num_threads = 4
        self.sess_options.inter_op_num_threads = 1
        self.sess_options.execution_mode = ort.ExecutionMode.ORT_PARALLEL
        self.sess = ort.InferenceSession(
            onnx_path, self.sess_options
        )

    def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
        return self.sess.run(None, batch)

    def __set_state__(self, values):
        self.model_proto = values["model_proto"]
        self.sess_options = ort.SessionOptions()
        self.sess_options.intra_op_num_threads = 1
        self.sess_options.inter_op_num_threads = 4
        self.sess_options.execution_mode = ort.ExecutionMode.ORT_PARALLEL
        self.sess = ort.InferenceSession(
            self.model_proto.SerializeToString(), self.sess_options
        )

    def __get_state__(self):
        return {
            "model_proto": self.model_proto,
        }

Note that this code is not complete, it’s just to give you an idea of what we are doing, we are using Onnx Runtime to run some computation on our data.

Now the issue come from how to use this code in ray data, here is what I tried:

  • Preprocessing using tasks
pipeline = PreprocessingPipeline(onnx_path)
dataset = read_parquet(dataset_path)
dataset = dataset.map_batches(pipeline)

This implementation will use tasks for the preprocessing, meaning that it will pickle my PreprocessingPipeline instance, and deseriallize it for each task, which is why I needed the __get_state__ and __set_state__ methods (Onnx Runtime sesssions are not serializable). The downside of this is that the Onnx Runtime session will be initialized once per task, and this is costly, making the preprocessing quite slow.

  • Preprocessing using actors
dataset = read_parquet(dataset_path)
dataset = dataset.map_batches(PreprocessingPipeline, fn_constructor_args=(onnx_path,), concurrency=2)

Note that this is high level code, and won’t work as is as onnx_path won’t exist on ray workers.
This implementation will use actors, meaning that the PreprocessingPipeline will be instantiated once as startup, and reused for tasks. On paper it looks better, as the Onnxruntime Session will be instantiated only once, and not for each task. But I need to specify the concurrency here, and the performances I observe are worse than with the task implementation. I played a lot with the different parameters (concurrency, num_cpus…), and I don’t manage to even get the same performances as before.

Now, to use it in Ray Train, I use PyTorch Ligthning, I have something like:

import pytorch_lightning as pl
from ray.train.lightning import prepare_trainer

trainer = pl.Trainer(...)
trainer = prepare_trainer(trainer)
trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader)

My main questions would be:

  • Do you have an idea why I don’t manage to have the same performances with Actors than with tasks ? Even though it looks better on paper ?
  • Do you have general guidelines on how to optimize Ray Data jobs when using Actors ?

Thanks for sharing, the high level Ray Data logic with the actor init makes sense to me.

Tagging colleagues from Ray Train team - @justinvyu or @matthewdeng , any best practices on how to best initialize + use ONNX with Ray Train + Ray Data here?