We are trying to invoke a function continuously from our program. The function consists implementation of ray.util.iter . The Parallel iterator worker processes are created and they keep piling up for every call. So what is the best way to clear the worker processes before another invocation of ray.util.iter.
cc @amogkam can you address this question?
@SumanthDatta Do you have a reproducible example showcasing this behavior?
Sure i will provide an example soon
@amogkam following is the sample example
import ray
import random
import time
outputList = []
for i in range(1000):
output = random.randint(0, 100)
outputList.append(output)
print(outputList)
ray.init(address='auto')
def ray_parallel_iter():
it = ray.util.iter.from_items(outputList, num_shards=5)
for value in it.gather_async():
print(value)
while True:
ray_parallel_iter()
time.sleep(10)
When you try to run this code , the worker processes keep piling up on the ray side , how to kill the existing workers and create new set of workers for every invocation of ray parallel iterator.