According to the documentation, I want to force Datasets to schedule tasks within the placement group, but it is not work.
https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-placement-groups
How to set placement group with dataset?
script:
import ray
from ray.util.placement_group import (
placement_group,
placement_group_table,
remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
ray.init()
pg = placement_group([{"CPU": 2}], strategy="STRICT_PACK")
ray.get(pg.ready())
import time
def test(batch):
time.sleep(10)
return batch
@ray.remote
def f():
ray.data.context.DatasetContext.get_current().scheduling_strategy = None
ds = ray.data.range_table(1, parallelism=1)
ds = ds.map_batches(test, batch_size=1)
for batch in ds.iter_batches(batch_size=1):
...
return True
ref = f.options(
num_cpus=0,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=True,
),
).remote()
print(ray.get(ref))