I want to be able to see how much memory/cpu a given actor/task is currently using (and possibly log this data/do certain application/scheduling decisions based on it). I would also like to programatically track shared obj store usage. Is there a python API for this?
@dirtyValera
Unfortunately, we currently don’t support cpu/memory usage per actor/task, but this is something we are looking into. One of the blockers is the cardinality of such data given the number of tasks/actor could be rather large in Ray.
I would also like to programatically track shared obj store usage. Is there a python API for this?
AFAIK, you could probably do the below (kind of hacky unfortunately):
For a cluster level resource usage, you could probably parse the obj store usage from autoscaler’s status. See example query usage from ray statushere
Or if you have prometheus set up, you could also scrape the ray_object_store_memory programmatically metric
If you could share a bit more on your usecase, that would be great. We are actively working on the resources observability in the coming releases so knowing the usecases would help us prioritize
Are there any updates on this? Using ray.utils.state works to get the PID of the Ray Task, and leveraging psutils.Process one can retrieve the RSS memory. However, this seems to happen only when the task is being executed in a single node.
BTW, so far I’ve been to do real-time monitoring of Actors using concurrency and psutil.Process.
Sure, here’s an example. Not sure if this is entirely correct or accurate, but I’ve tested it and seems to do the job. Some sort of feature like this would be desirable, to prevent actors from using more memory than specified in their resources (or just to give more observability)
import time # noqa: D100
from typing import Any
import numpy as np
import psutil
import ray
import os
@ray.remote(max_concurrency=2) # Required to run monitoring and a task
class SelfMonitoringActor():
def __init__(self) -> None:
self.finished = False
self.period = 1
def monitor(self):
worker_pid = os.getpid()
process = psutil.Process(worker_pid)
print(f"Monitoring is taking place in: PID {os.getpid()}")
while not self.finished:
memory_used = process.memory_info().rss
print(f"Task is using {memory_used / (1024 ** 2)} MB")
time.sleep(self.period/2)
def memory_intensive_task(self) -> Any:
"""Simulates a memory intensive tasks."""
print(f"Memory intensive task is taking place in: PID {os.getpid()}")
for i in range(100):
large_array = np.ones((100+i, 100+1, 100+i))*0.5
large_array.sum()
print(
f"Iteration: {i}. Memory used by array: {large_array.size * large_array.itemsize / (1024 ** 2)} MB"
)
time.sleep(self.period)
return large_array.sum()
def run(self) -> Any:
task_out = self.memory_intensive_task()
self.finished = True
time.sleep(self.period) # Leave time to stop monitor running
return task_out
def main():
ray.init()
actor = SelfMonitoringActor.remote()
task_out, _ = ray.get( # Run both tasks concurrently in same worker and PID
[actor.run.remote(),
actor.monitor.remote()]
)
print("Task ran successfully!")
return task_out
if __name__ == "__main__":
main()