- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have a workflow that requires a several tasks be run in consecutive order and so I wanted to use placement groups (pg) to do this.
As recommended by this thread, I created a nested function structure so that the PG could be created and destroyed by the same driver, but not be itself contained in the PG. The code essentially looks like the following:
@ray.remote(resources = my_resources) def task1(input): ... return output @ray.remote(resources = my_resources) def task2(input): ... return output def manage_tasks(input) pg = placement_group([my_resources], strategy="STRICT_PACK") ray.get(pg.ready()) pg_strategy = PlacementGroupSchedulingStrategy(placement_group=pg) future1 = task1.options(scheduling_strategy=pg_strategy).remote(input) output = ray.get(task2.options(scheduling_strategy=pg_strategy).remote(future1)) remove_placement_group(pg) return output outputs = ray.get([manage_tasks.remote(i) for i in range(100)])
The issue I am having is that after the first “round” of placement groups, the other manage_tasks workers never make it past the ray.get(pg.ready()) line. I am monitoring the cluster and there is space on the worker nodes to create new PGs. The cluster also has demands for 1+ PG groups. Thoughts?