Ray inferencing not happening in streaming way

I am using ray for inferencing.

 scale = ActorPoolStrategy(size=100)
    scale_gpu = ActorPoolStrategy(size=4)
    preprocessed_ds = ds.map_batches(CPUPreprocess, compute=scale, num_gpus=0, batch_size=192, num_cpus=1)
    predictions = preprocessed_ds.map_batches(GPUInference, compute=scale_gpu, num_gpus=1, batch_size=1)

The above is part of my code.

Preprocess is cpu intensive and Inference is Gpu intensive.

Issue: Since ray data is streaming, I would expect that as soon as a batch has been preprocessed, it would be available for Inference meaning inference and preprocessing should happen in parallel, that is both cpu and gpu should be in use almost all the time.

But this is not what ends up happening for me.

Inference seems to be waiting for Preprocessing to be finished. And because of this the object store memory ends up getting full and then the job fails.

Active Tasks By Name only has CpuPreprocess in the graph.

Any advice?

Image below shows this (gpu is never being used).

@humblyInsane Thanks for taking map_batch() inference. Can you try this, with a smaller batch size. see what. output you get.

scale = ActorPoolStrategy(size=4)
scale_gpu = ActorPoolStrategy(size=4)
preprocessed_ds = ds.map_batches(CPUPreprocess, compute=scale, batch_size=4)
predictions = preprocessed_ds.map_batches(GPUInference, compute=scale_gpu, num_gpus=1, batch_size=4

Can you include both the code for CPUPreprocess, and GPUInference?
Also are you getting any

cc @chengsu

Code @Jules_Damji. Trying smaller batch right now too.

class CpuPreprocess:
    def __init__(self):
        TEXT_MODEL = "distilbert-base-multilingual-cased"
        self.tokenizer = AutoTokenizer.from_pretrained(TEXT_MODEL, use_fast=True)

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
         <SOME list operations to get text>

        tokens = self.tokenizer(
            text, return_tensors="np", padding="longest", truncation=True, max_length=200

        all_feats["input_ids"] = [tokens["input_ids"]]
        all_feats["attention_mask"] = [tokens["attention_mask"]]

        return all_feats

class Inference:
    def __init__(self):
        TEXT_MODEL = "distilbert-base-multilingual-cased"
        model = model.to("cuda")
        model = torch.nn.DataParallel(model)
        self.model = model

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        # start_time = time.time()
        feats = {}
        feats["input_ids"] = torch.from_numpy(batch["input_ids"][0]).to("cuda")
        feats["attention_mask"] = torch.from_numpy(batch["attention_mask"][0]).to("cuda")
        with torch.inference_mode():
            predictions = self.model(feats)
        return {
            "predictions": predictions

If I use

class PreprocessAndInference:
    def __init__(self):
        self.preprocess = Preprocess()
        self.inference = Inference()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        start_time = time.perf_counter()
        batch = self.preprocess(batch)
        infer = self.inference(batch)
        return {'infer': infer}

as my actor then it works fine meaning a batch is immediately passed from preprocessing to inference

But this defeats the whole purpose of me using ray.

Since Cpu is expensive, I want to be able to make use of multiple cpus and do inference on gpus.

I want to avoid a smaller batch size because in the cpu preprocessing I want to be able to tokenize multiple rows at once so that I can utilize the full gpu. For a given batch, the length of the tokens need to be same for gpu inferencing and therefore, I cannot tokenize each row one by one and then take a batch of it for gpu inferencing.

NOTE: I’m using ray 2.6

Even with smaller batch size, seeing the same thing (gpu not being used, Inference not in active tasks, object store mem keeps filling up)


Here is on example that uses both preprcoess (using by default CPU) and Inference on PyTorch using (GPUs) with large batch. The only difference that preprocessing is done with map, each row at time, but I don’t why it cant be used with map_batches, converting this function into a class.

Yes, reason I asked you to use small batch was to test and see if you get any output that suggests streaming form

@humblyInsane Looks like you might have solved the problem with upgrading to 2.8. Let us know if that works as expected.

@humblyInsane Since we are resolving this issue via private slack channel, I’ll close this.

Hi @Jules_Damji, I am facing the same issue currently. I would like to have the solution here, So that others like me can have the access to the solution.

I have explained more about my issue in Ray git issues. Please check here

Would like to know the solution. Thanks in Advance.

Hi, is the issue above resolved. I am just checking out to see if we have any updates here. thank you