Based on these code snippets, do you have any opinions, or suggestions, on an easy way to reduce the amount of tasks killed by the OOM?
or should I just let the tasks OOM and retry?
I did take notice of this as well: Design Patterns & Anti-patterns — Ray 2.32.0
# Memory-aware wrappers
@ray.remote
class MemoryAwareQueue:
def __init__(self, max_size: int, max_memory_bytes: int):
self.queue = asyncio.Queue(maxsize=max_size)
self.max_memory_bytes = max_memory_bytes
def memory_usage(self):
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024
async def put(self, item):
while self.memory_usage() > self.max_memory_bytes or self.queue.full():
await asyncio.sleep(5)
await self.queue.put(item)
async def get(self):
return await self.queue.get()
def task_done(self):
self.queue.task_done()
with open('/proc/meminfo', 'r') as mem:
for line in mem:
if line.startswith('MemTotal'):
MAX_SYS_MEM = int(line.split()[1]) * 1024
@ray.remote
class GlobalQueueManager:
def __init__(self, max_size: int, max_memory_percent: float):
self.queue = asyncio.Queue(maxsize=max_size)
self.max_memory_percent = max_memory_percent
def memory_usage(self):
usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return (usage / MAX_SYS_MEM) * 100
async def put(self, item):
while self.memory_usage() > self.max_memory_percent or self.queue.full():
await asyncio.sleep(5)
await self.queue.put(item)
async def get(self):
return await self.queue.get()
def task_done(self):
self.queue.task_done()
def qsize(self):
return self.queue.qsize()
global_queue_manager = None
def get_memory_manager():
global global_queue_manager
# Create a global queue manager
if not global_queue_manager:
global_queue_manager = GlobalQueueManager.remote(QUEUE_SIZE, MAX_MEMORY_PERCENT)
return global_queue_manager
def global_memory_aware(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
async def async_wrapper():
global_queue_manager = get_memory_manager()
result = func(*args, **kwargs)
if isinstance(result, list):
for item in result:
await global_queue_manager.put.remote(item)
else:
await global_queue_manager.put.remote(result)
results = []
while ray.get(global_queue_manager.qsize.remote()) > 0:
item = await global_queue_manager.get.remote()
results.append(item)
global_queue_manager.task_done.remote()
return results
return asyncio.run(async_wrapper())
return wrapper
@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self, max_memory_percent=80):
self.event = asyncio.Event()
self.max_memory_percent = max_memory_percent
def send(self):
if psutil.virtual_memory().percent < self.max_memory_percent:
self.event.set()
else:
self.event.clear()
async def wait(self):
await self.event.wait()
# Create a global SignalActor
signal = None
def get_signal_actor():
global signal
if not signal:
signal = SignalActor.remote()
return signal
def memory_aware(func: Callable) -> Callable:
@ray.remote(num_cpus=2, max_retries=20, memory=1024 * 1024 * 1024)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# while psutil.virtual_memory().percent > MAX_MEMORY_PERCENT:
# time.sleep(random.random() * JITTER)
try:
print(f"memory_aware call: {func.__name__} args: {len(args)} kwargs_keys: {kwargs.keys()}")
return func(*args, **kwargs)
except Exception as e:
import traceback
logger.error(traceback.format_exc())
raise
wrapper.__name__ = f"{func.__name__}_mem_wrap"
return wrapper
The other thing I’ve done:
"_system_config": {
"local_fs_capacity_threshold": 0.99,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": "/tmp/spill",
"buffer_size": 1_000_000,
}
},
)
},
AND lowering the object storage memory:
ray-worker-2:
build:
context: .
dockerfile: ./dockerfile.ray.cluster
depends_on:
- ray-head
command: ray start --address=ray-head:6479 --object-store-memory 500000000 --ray-client-server-port=10001 --block
environment:
- RAY_HEAD_IP=ray-head
- ENV=local-docker
networks:
- net
extra_hosts:
- "host.docker.internal:host-gateway"