Memory leakage with remote() calls

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am experiencing memory leakage in my code when I am using remote() calls to get results from different actors. This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.

I am using ray==2.34.0 and python3.10 with ubuntu20.04.
Please find the reproduction script attached.

import re
import ray
import time
import asyncio
import numpy as np
from ray.util.queue import Queue as rQueue

ray.init(address="auto" if ray.is_initialized() else None)


class Data:
	def __init__(self, src_id, seq_id=None, arr=None):
		self.src_id = src_id
		self.seq_id = seq_id
		self.arr = arr
		self.results = None


@ray.remote
class PacketDecoder:
	def __init__(self, actor_name="packet_decoder") -> None:
		self.actor_name = actor_name
		self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
		self._count = 0

	async def run(self, data_holder):
		print(f"{self.actor_name} Packet Decoder started.")
				
		while True:
			try:
				if data_holder.qsize() > 20:
					print("Decoder: Data queue is full .......")
					await asyncio.sleep(1)
					continue

				data_arr = np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8)
				data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
				
				# some blocking task
				time.sleep(0.08)

				# Push data to holders
				await data_holder.put_async(data_obj, timeout=1)
				
				self._count += 1
			except Exception as ex:
				print(f"Error: {self.actor_name}: {ex}")


@ray.remote
class Summation:
	def run(self, arr):
		time.sleep(0.001)
		return np.sum(arr)


@ray.remote
class Power:
	def run(self, arr, p=2):
		time.sleep(0.001)
		return np.power(arr, p)

@ray.remote
class FourierTransform:
	def run(self, arr):
		time.sleep(0.002)
		return np.fft.fft(arr)

@ray.remote
class BasicMath:
	def __init__(self, sum_actor, power_actor):
		self.sum_actor = sum_actor
		self.power_actor = power_actor

	def run(self, arr):
		result = {
			"sum" : self.sum_actor.run.remote(arr),
			"power" : self.power_actor.run.remote(arr),
		}
		return result


@ray.remote
class ODIActor:
	def __init__(self, basic_math_actor, fft_actor, actor_name="odi"):
		self.actor_name = actor_name
		self.basic_math_actor = basic_math_actor
		self.fft_actor = fft_actor

	async def run(self, data_holder, out_holder, src_id, timeout=2):
		while True:
			try:
				try:
					data_obj = await data_holder.get_async(block=True, timeout=timeout)
				except:
					print(f"[{src_id}]: ODI: No frame for last {timeout} seconds.")
					continue

				if out_holder.qsize() > 10:
					print(f"ODI: {src_id}: {seq_id}: Result queue is full .......")
					await asyncio.sleep(0.1)
					continue

				data_arr = data_obj.arr
				seq_id = data_obj.seq_id
				data_obj.results = {
					"basic_results" : self.basic_math_actor.run.remote(data_arr),
					"fft_result": self.fft_actor.run.remote(data_arr),
				}

				# Push data to out holder
				await out_holder.put_async(data_obj, timeout=1)

			except Exception as ex:
				print(f"ODI: [{src_id}]: Error: {ex}.")


@ray.remote
class Aggregator:
	def __init__(self, actor_name="aggregator"):
		self.actor_name = actor_name

	async def run(self, result_holder, timeout=2):
		while True:
			try:
				try:
					data_obj = await result_holder.get_async(block=True, timeout=timeout)
				except:
					print(f"Aggregator: No frame for last {timeout} seconds.")
					continue

				data_arr = data_obj.arr
				src_id = data_obj.src_id
				seq_id = data_obj.seq_id
				_results = data_obj.results
				
				_basic_operations = await _results["basic_results"]
				_sum, _power, _fft = await asyncio.gather(
										_basic_operations["sum"],
										_basic_operations["power"],
										_results["fft_result"]
									)

				# print(f"Result: {src_id}: {seq_id}: Data:{data_arr}, Sum:{_sum}, Power:{_power}, FFT:{_fft}")
			except Exception as ex:
				print(f"Aggregator: Error: {ex}.")
			

def main():
	
	# initialise decoding actors
	src_prefix = "src_"
	decode_actors = {}
	for src in range(7):
		decoder_name = f"decoder_{src}"
		decode_actors[f"{src_prefix}{src}"] = PacketDecoder.options(name=decoder_name, namespace="test", 
													num_cpus=0.1).remote(decoder_name)

	# initialise odi actor
	sum_actor = Summation.options(name="sum", namespace="test", num_cpus=0.2).remote()
	power_actor = Power.options(name="power", namespace="test", num_cpus=0.2).remote()
	math_actor = BasicMath.options(name="maths", namespace="test", num_cpus=0.2).remote(sum_actor, power_actor)
	fft_actor = FourierTransform.options(name="fft", namespace="test", num_cpus=0.2).remote()
	odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote(math_actor, fft_actor, "odi")
	
	# initialise aggregator
	aggregator = Aggregator.options(name="aggregator", namespace="test", num_cpus=0.2).remote("aggregator")
	
	# create frame holders for all decoding actors
	data_holders = {
		src_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{src_name}_dh"})
		for src_name in decode_actors
	}

	# create result holder for odi actor
	result_holder = rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"result_h"})
	
	# start Aggregator actors first
	aggregator.run.remote(result_holder)

	# start ODI actor for each source in async
	for f_name in data_holders:
		odi.run.remote(data_holders[f_name], result_holder, f_name)

	# start the decoding actors
	decoder_refs = []
	for d_name in decode_actors:
		decoder_refs.append(decode_actors[d_name].run.remote(data_holders[d_name]))

	ray.get(decoder_refs)

	print("------------------------- END -------------------------")


if __name__ == "__main__":
	try:
		main()
	except Exception as ex:
		print(f"Error: {ex}")

Find the attached dashboard graphs of Queue actor, BasicMath actor, and WORKER_HEAP, where you can find the memory leakage:



You can check task counts also in following graph, which is steady over a time:

I am not getting any memory leakage in ODIActot though. That’s a strange behaviour.

I have generated git issue for the same. You can find it here: [Core] Memory leakage with remote() calls · Issue #47274 · ray-project/ray · GitHub. Please let me know if I can provide any other information.