Memory Scheduled Tasks OOM

Based on these code snippets, do you have any opinions, or suggestions, on an easy way to reduce the amount of tasks killed by the OOM?

or should I just let the tasks OOM and retry?

I did take notice of this as well: Design Patterns & Anti-patterns — Ray 2.32.0


# Memory-aware wrappers

@ray.remote
class MemoryAwareQueue:
    def __init__(self, max_size: int, max_memory_bytes: int):
        self.queue = asyncio.Queue(maxsize=max_size)
        self.max_memory_bytes = max_memory_bytes

    def memory_usage(self):
        return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024

    async def put(self, item):
        while self.memory_usage() > self.max_memory_bytes or self.queue.full():
            await asyncio.sleep(5)
        await self.queue.put(item)

    async def get(self):
        return await self.queue.get()

    def task_done(self):
        self.queue.task_done()

with open('/proc/meminfo', 'r') as mem:
    for line in mem:
        if line.startswith('MemTotal'):
            MAX_SYS_MEM = int(line.split()[1]) * 1024

@ray.remote
class GlobalQueueManager:
    def __init__(self, max_size: int, max_memory_percent: float):
        self.queue = asyncio.Queue(maxsize=max_size)
        self.max_memory_percent = max_memory_percent

    def memory_usage(self):
        usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
        return (usage / MAX_SYS_MEM) * 100                    

    async def put(self, item):
        while self.memory_usage() > self.max_memory_percent or self.queue.full():
            await asyncio.sleep(5)
        await self.queue.put(item)

    async def get(self):
        return await self.queue.get()

    def task_done(self):
        self.queue.task_done()

    def qsize(self):
        return self.queue.qsize()

global_queue_manager = None

def get_memory_manager():
    global global_queue_manager
    # Create a global queue manager
    if not global_queue_manager:
        global_queue_manager = GlobalQueueManager.remote(QUEUE_SIZE, MAX_MEMORY_PERCENT)
    return global_queue_manager

def global_memory_aware(func: Callable) -> Callable:
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        async def async_wrapper():
            global_queue_manager = get_memory_manager()
            result = func(*args, **kwargs)
            if isinstance(result, list):
                for item in result:
                    await global_queue_manager.put.remote(item)
            else:
                await global_queue_manager.put.remote(result)
            results = []
            while ray.get(global_queue_manager.qsize.remote()) > 0:
                item = await global_queue_manager.get.remote()
                results.append(item)
                global_queue_manager.task_done.remote()
            return results
        return asyncio.run(async_wrapper())
    return wrapper



@ray.remote(num_cpus=0)
class SignalActor:
    def __init__(self, max_memory_percent=80):
        self.event = asyncio.Event()
        self.max_memory_percent = max_memory_percent

    def send(self):
        if psutil.virtual_memory().percent < self.max_memory_percent:
            self.event.set()
        else:
            self.event.clear()

    async def wait(self):
        await self.event.wait()

# Create a global SignalActor
signal = None
def get_signal_actor():
    global signal
    if not signal:
        signal = SignalActor.remote()
    return signal

def memory_aware(func: Callable) -> Callable:
    @ray.remote(num_cpus=2, max_retries=20, memory=1024 * 1024 * 1024)
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # while psutil.virtual_memory().percent > MAX_MEMORY_PERCENT:
        #     time.sleep(random.random() * JITTER)
        try:
            print(f"memory_aware call: {func.__name__} args: {len(args)} kwargs_keys: {kwargs.keys()}")
            return func(*args, **kwargs)
        except Exception as e:
            import traceback
            logger.error(traceback.format_exc())
            raise
    wrapper.__name__ = f"{func.__name__}_mem_wrap"
    return wrapper

The other thing I’ve done:

         "_system_config": {
            "local_fs_capacity_threshold": 0.99,
            "object_spilling_config": json.dumps(
                {
                "type": "filesystem",
                "params": {
                    "directory_path": "/tmp/spill",
                    "buffer_size": 1_000_000,
                }
                },
            )
        },

AND lowering the object storage memory:

  ray-worker-2:
    build:
      context: .
      dockerfile: ./dockerfile.ray.cluster
    depends_on:
      - ray-head
    command: ray start --address=ray-head:6479 --object-store-memory 500000000 --ray-client-server-port=10001 --block
    environment:
      - RAY_HEAD_IP=ray-head
      - ENV=local-docker
    networks:
      - net
    extra_hosts:
    - "host.docker.internal:host-gateway"

It’s not very clear what is your goal here. It seems that you know how much memory each task would need, because you set memory resource requirement in @ray.remote. Then why not just use memory-aware scheduling and let Ray deal with the queueing.