I am trying to run data-loading and batch-prediction of a semantic segmentation model using a ray.data.dataset. My pipeline looks something like this:
BLOCK_SIZE = 128
metadata = ray.data.from_pandas(df)
pipe = (
metadata
.repartition(metadata.count() // BLOCK_SIZE)
.window(blocks_per_window=64) # determined by num CPUs
.map(load_data_fn, num_cpus=1)
.map_batches(
Predictor,
fn_constructor_args=(cfg, checkpoint_path),
batch_size=8,
num_gpus=1,
compute=ray.data.ActorPoolStrategy(size=8, max_tasks_in_flight_per_actor=4)
)
)
Here, load_data_fn
is a function that takes a row from the dataframe and returns the corresponding image (as numpy array inside a dict). The predictor looks sth like this:
class Predictor:
def __init__(self, cfg, checkpoint_path):
print("Model is being setup")
self.model: torch.nn.Module = load_model(...)
def __call__(self, data):
...
The reason for the .repartition(...).window(...)
calls in the pipeline setup is that without these, I would easily run out of object store memory since each image is multiple MBs in size.
Currently, I am observing that after each block, the Predictor
is reloaded. Naturally, reloading the model every time means that GPU utilisation isn’t great. Is there any way how I can re-use the actor-objects? Or is there something fundamentally wrong with the way I am setting up the pipeline?