Memory leakage with ray.put()

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am seeing the memory leakage while putting a object (numpy array) into object store via ray.put(). I am using ray==2.8.0 and python3.8 with ubuntu20.04.

Please find attached the reproduction scripts.

import re
import ray
import time
import numpy as np
from ray.util.queue import Queue as rQueue

ray.init(address="auto" if ray.is_initialized() else None)


class Frame:
	def __init__(self, src_id, seq_id=None, ref=None):
		self.src_id = src_id
		self.seq_id = seq_id
		self.ref = ref


@ray.remote
class PacketDecoder:
	def __init__(self, actor_name="packet_decoder") -> None:
		self.actor_name = actor_name
		self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
		self.frame_count = 0

	async def run(self, frame_holder):
		print(f"{self.actor_name} Packet Decoder started.")
				
		while True:
			try:
				# generate frame
				# bgr_frame = np.random.rand(1920, 1080, 3)
				bgr_frame = np.random.randint(low=0, high=255, size=(1920, 1080, 3), dtype=np.uint8)
				frame_obj = Frame(src_id=self.actor_id, seq_id=self.frame_count, ref=ray.put(bgr_frame))
				
				# some blocking task
				time.sleep(0.04)
				
				# Push data to holders
				await frame_holder.put_async(frame_obj, timeout=1)
				
				self.frame_count += 1
			except Exception as ex:
				print(f"Error: {self.actor_name}: {ex}")


@ray.remote
class ODIActor:
	def __init__(self, actor_name="odi"):
		self.actor_name = actor_name

	async def run(self, frame_holder, src_id, timeout=2):
		while True:
			try:
				try:
					frame_obj = await frame_holder.get_async(block=True, timeout=timeout)
				except:
					print(f"[{src_id}]: No frame for last {timeout} seconds.")
					continue

				ray.internal.free(frame_obj.ref)

			except Exception as ex:
				print(f"[{src_id}]: Error: {ex}.")
				

def main():
	
	# initialise camera actors
	cam_prefix = "cam_"
	decode_actors = {}
	for cam in range(7):
		decoder_name = f"decoder_{cam}"
		decode_actors[f"{cam_prefix}{cam}"] = PacketDecoder.options(name=decoder_name, namespace="test", 
													num_cpus=0.1).remote(decoder_name)

	# initialise odi actor
	odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote("odi")
	
	# create frame holders for all camera actors
	frame_holders = {
		cam_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{cam_name}_fh"})
		for cam_name in decode_actors
	}

	# start ODI actors first
	for f_name in frame_holders:
		odi.run.remote(frame_holders[f_name], f_name)

	# start the camera actors
	decoder_refs = []
	for d_name in decode_actors:
		decoder_refs.append(decode_actors[d_name].run.remote(frame_holders[d_name]))

	ray.get(decoder_refs)

	print("------------------------- END -------------------------")


if __name__ == "__main__":
	try:
		main()
	except Exception as ex:
		print(f"Error: {ex}")

Please find some snapshots of memory utilisation from dashboard to understand memory consumption.





Please let me know, if I can provide any further details.

Does this repro after a ray upgrade to latest; on a new version of python and ubuntu 2204?

When you ray.put() you puts data into Ray object store. It’s kept there until the last reference to it is released. In your code I think the object is always held by frame → frame_holder → frame_holders, so it’s never released. I think this is WAI. If you release the references does it still leak?

@Ruiyang_Wang If I understand properly, you are saying that the object is being holded as frame_holder reference is stored in frame_holders. Can you please confirm this? And if you could explain, how exactly that could be the issue?

Based on memory analysis, I could find followings:

  • Object store memory is not leaking, it’s constant.
  • There is memory leakage in WORKER_HEAP.
  • PacketDecoder is leaking around 1GB of memory over a time of 12 hours, which means object is being released from object store, otherwise increment should be much high, as single array shape is (1920,1080,3).
  • There is memory leakage in QueueActor also.

I am also removing object forcefully via ray.internal.free(). Isn’t it supposed to remove object without checking of reference count ?

Hi @Sam_Chan , I have run the script with ray=2.34.0 with python3.10 and ubuntu20.04. And, I could find the same behaviour of memory leakage with latest ray version also. Can you please suggest, how can I debug this part?

Please let me know, if you need any extra information.

I wonder if it’s possible for you to not use ray.put? In your code:

  1. in Frame: just keep the numpy array in it as a field
  2. in PacketDecoder: put_async the frame with a numpy array to frame_holder
  3. in ODIActor: get_async from frame_holder and just use the inside numpy, no free.

This way, Ray should be able to manage all the refs on its own.

@Ruiyang_Wang Thanks for your suggestions.

As per ray doc (Anti-pattern: Passing the same large argument by value repeatedly harms performance — Ray 2.34.0), It would be anti pattern if your object is large and passing through multiple actors and you pass it directly without keeping it into object store.

The given script is just for reproducing the issue. In actual pipeline, that object is being passed through multiple actors, and possibly multiple nodes also. And some of the in between actors, don’t even need this image.

From this thread, I want to find out the cause of memory leakage. So that I would know if there is any problem in code or this issue is from ray side.

I am getting the memory leakage in queue here also: Memory leakage with remote() calls.

@Sam_Chan @Ruiyang_Wang I have generated git issue for the same. You can find it here: [Core] Memory leakage with ray.put() · Issue #47273 · ray-project/ray · GitHub. Please let me know if I can provide any other information.

TY for submitting - we’ll take a closer look next Monday.