How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I am experiencing memory leakage in my code when I am using remote()
calls to get results from different actors. This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.
I am using ray==2.34.0
and python3.10
with ubuntu20.04
.
Please find the reproduction script attached.
import re
import ray
import time
import asyncio
import numpy as np
from ray.util.queue import Queue as rQueue
ray.init(address="auto" if ray.is_initialized() else None)
class Data:
def __init__(self, src_id, seq_id=None, arr=None):
self.src_id = src_id
self.seq_id = seq_id
self.arr = arr
self.results = None
@ray.remote
class PacketDecoder:
def __init__(self, actor_name="packet_decoder") -> None:
self.actor_name = actor_name
self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
self._count = 0
async def run(self, data_holder):
print(f"{self.actor_name} Packet Decoder started.")
while True:
try:
if data_holder.qsize() > 20:
print("Decoder: Data queue is full .......")
await asyncio.sleep(1)
continue
data_arr = np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8)
data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
# some blocking task
time.sleep(0.08)
# Push data to holders
await data_holder.put_async(data_obj, timeout=1)
self._count += 1
except Exception as ex:
print(f"Error: {self.actor_name}: {ex}")
@ray.remote
class Summation:
def run(self, arr):
time.sleep(0.001)
return np.sum(arr)
@ray.remote
class Power:
def run(self, arr, p=2):
time.sleep(0.001)
return np.power(arr, p)
@ray.remote
class FourierTransform:
def run(self, arr):
time.sleep(0.002)
return np.fft.fft(arr)
@ray.remote
class BasicMath:
def __init__(self, sum_actor, power_actor):
self.sum_actor = sum_actor
self.power_actor = power_actor
def run(self, arr):
result = {
"sum" : self.sum_actor.run.remote(arr),
"power" : self.power_actor.run.remote(arr),
}
return result
@ray.remote
class ODIActor:
def __init__(self, basic_math_actor, fft_actor, actor_name="odi"):
self.actor_name = actor_name
self.basic_math_actor = basic_math_actor
self.fft_actor = fft_actor
async def run(self, data_holder, out_holder, src_id, timeout=2):
while True:
try:
try:
data_obj = await data_holder.get_async(block=True, timeout=timeout)
except:
print(f"[{src_id}]: ODI: No frame for last {timeout} seconds.")
continue
if out_holder.qsize() > 10:
print(f"ODI: {src_id}: {seq_id}: Result queue is full .......")
await asyncio.sleep(0.1)
continue
data_arr = data_obj.arr
seq_id = data_obj.seq_id
data_obj.results = {
"basic_results" : self.basic_math_actor.run.remote(data_arr),
"fft_result": self.fft_actor.run.remote(data_arr),
}
# Push data to out holder
await out_holder.put_async(data_obj, timeout=1)
except Exception as ex:
print(f"ODI: [{src_id}]: Error: {ex}.")
@ray.remote
class Aggregator:
def __init__(self, actor_name="aggregator"):
self.actor_name = actor_name
async def run(self, result_holder, timeout=2):
while True:
try:
try:
data_obj = await result_holder.get_async(block=True, timeout=timeout)
except:
print(f"Aggregator: No frame for last {timeout} seconds.")
continue
data_arr = data_obj.arr
src_id = data_obj.src_id
seq_id = data_obj.seq_id
_results = data_obj.results
_basic_operations = await _results["basic_results"]
_sum, _power, _fft = await asyncio.gather(
_basic_operations["sum"],
_basic_operations["power"],
_results["fft_result"]
)
# print(f"Result: {src_id}: {seq_id}: Data:{data_arr}, Sum:{_sum}, Power:{_power}, FFT:{_fft}")
except Exception as ex:
print(f"Aggregator: Error: {ex}.")
def main():
# initialise decoding actors
src_prefix = "src_"
decode_actors = {}
for src in range(7):
decoder_name = f"decoder_{src}"
decode_actors[f"{src_prefix}{src}"] = PacketDecoder.options(name=decoder_name, namespace="test",
num_cpus=0.1).remote(decoder_name)
# initialise odi actor
sum_actor = Summation.options(name="sum", namespace="test", num_cpus=0.2).remote()
power_actor = Power.options(name="power", namespace="test", num_cpus=0.2).remote()
math_actor = BasicMath.options(name="maths", namespace="test", num_cpus=0.2).remote(sum_actor, power_actor)
fft_actor = FourierTransform.options(name="fft", namespace="test", num_cpus=0.2).remote()
odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote(math_actor, fft_actor, "odi")
# initialise aggregator
aggregator = Aggregator.options(name="aggregator", namespace="test", num_cpus=0.2).remote("aggregator")
# create frame holders for all decoding actors
data_holders = {
src_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{src_name}_dh"})
for src_name in decode_actors
}
# create result holder for odi actor
result_holder = rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"result_h"})
# start Aggregator actors first
aggregator.run.remote(result_holder)
# start ODI actor for each source in async
for f_name in data_holders:
odi.run.remote(data_holders[f_name], result_holder, f_name)
# start the decoding actors
decoder_refs = []
for d_name in decode_actors:
decoder_refs.append(decode_actors[d_name].run.remote(data_holders[d_name]))
ray.get(decoder_refs)
print("------------------------- END -------------------------")
if __name__ == "__main__":
try:
main()
except Exception as ex:
print(f"Error: {ex}")