Removing placement group from remote function

I want to create a placement group, pass to a remote task to use and remove when it’s done.

@ray.remote
def large_task(pg):
   ...
   remove_placement_group(pg)

for i in range(100):
   pg = placement_group(...)
   large_task.remote(pg)

there will be only enough resource to create 10 placement groups for example. in this code, I don’t need to worry about cleaning up pg. Calling remove_placement_group in the remote function seem to cause a seg-fault. The code seems to use global_worker and etc. What’s the right way of cleaning up the placement groups?

Hmm this is not supposed to cause seg fault. Can you create a Github issue with the reproduction?

(pid=3643, ip=xxx.xxx.xxx.xxx) 2021-02-16 23:09:49,506 ERROR worker.py:390 – SystemExit was raised from the worker
(pid=3643, ip=xxx.xxx.xxx.xxx) Traceback (most recent call last):
(pid=3643, ip=xxx.xxx.xxx.xxx) File “python/ray/_raylet.pyx”, line 570, in ray._raylet.task_execution_handler
(pid=3643, ip=xxx.xxx.xxx.xxx) File “python/ray/_raylet.pyx”, line 434, in ray._raylet.execute_task
(pid=3643, ip=xxx.xxx.xxx.xxx) File “python/ray/_raylet.pyx”, line 473, in ray._raylet.execute_task
(pid=3643, ip=xxx.xxx.xxx.xxx) File “python/ray/_raylet.pyx”, line 476, in ray._raylet.execute_task
(pid=3643, ip=xxx.xxx.xxx.xxx) File “python/ray/_raylet.pyx”, line 480, in ray._raylet.execute_task
(pid=3643, ip=xxx.xxx.xxx.xxx) remove_placement_group(pg)
(pid=3643, ip=xxx.xxx.xxx.xxx) File “/home/centos/.local/lib/python3.7/site-packages/ray/util/placement_group.py”, line 211, in remove_placement_group
(pid=3643, ip=xxx.xxx.xxx.xxx) worker.core_worker.remove_placement_group(placement_group.id)
(pid=3643, ip=xxx.xxx.xxx.xxx) File “/home/centos/.local/lib/python3.7/site-packages/ray/worker.py”, line 387, in sigterm_handler
(pid=3643, ip=xxx.xxx.xxx.xxx) sys.exit(1)
(pid=3643, ip=xxx.xxx.xxx.xxx) SystemExit: 1

this was the exact trace.

Ah, this is the expected behavior. The tasks and actors scheduled with the placement group fate share with the pg.

I am not sure if I mentioned in the doc. I will update it if I didn’t. To make sure your code would look like this right?

@ray.remote
def task(pg):
    remove_placement_group(pg)

pg = placement_group(...)
task.options(placement_group=pg).remote(pg)

One of possible solution is to remove placement groups in a driver that submits a task after all tasks are done.

Thanks for confirming that! If I were to manage the placement group on the driver code, I would need a placement group pool and need to actively control # of jobs running (otherwise PG creation would block the execution). With the above code, the scheduling behaviour is controlled by the PG logic and I cannot create more PGs than allowed by the resource constraints. This was much simpler. I would probably need to control the exception of the task getting killed then.

Sorry. I was mistaken that placement_group is a blocking operation. It’s async operation… I was using ray.get(pg.ready()) by default.

it still does not change the fact for me that discarding PG at the consumer of PG makes it easier to use. The driver creates a PG, launches a job with PG, and that’s it.

I can deal with this by one more remote function though I would try to avoid nested remote calls.

Do you usually have a single task per placement group? If you have multiple tasks, removing it from consumers can easily cause some race condition right? (given there’s the fate sharing behavior)

One of solutions (probably not ideal, but it might work) is to create a detached actor that is dedicated to remove placement groups. You can request to the detached actor when you have to GC pgs.

I ended up using a launcher remote function that creates PG, wait, start new a large job using PG, and remove PG. Yes, I wanted to use a dedicated machine for each large job. Thanks for checking!

1 Like