Hello everyone.
I have a function which is supposed t run in parallel for different sequence of arguments.
It is supposed to get items inside each sequence and square it. To avoid redundancy, I used a shared_state_actor which genuinely is a dictionary to hold the keys for the arguments that are already evaluated. The whole program is the following:
import ray
import os
import time
from collections import Counter
@ray.remote
class SharedStateActor:
def __init__(self):
self.shared_flag = dict()
def update_shared_dict(self, arg):
self.shared_flag.update(arg)
def get_shared_dict(self, arg):
return self.shared_flag.get(arg, 0)
@ray.remote
def square(arg_list: list, flag: SharedStateActor):
arg_squared = dict()
repeated_arguments = list()
for arg in arg_list:
if ray.get(flag.get_shared_dict.remote(arg)) != 0:
repeated_arguments.append(ray.get(flag.get_shared_dict.remote(arg)))
continue
else:
# time.sleep(np.random.uniform(0.8, 1.5))
time.sleep(0.5)
flag.update_shared_dict.remote({arg: arg})
arg_squared.update({arg: arg ** 2})
return arg_squared
def main():
sequence_list = [list(range(start, end)) for start, end in [
(0, 20),
(5, 30),
(15, 40),
(30, 40),
(40, 60),
(50, 80),
(60, 70),
(60, 80),
(60, 90),
(60, 95),
(70, 96),
(80, 98),
(95, 101)
]]
if not ray.is_initialized():
# Your Ray initialization logic goes here
ray.init(num_cpus=max(int(1 * os.cpu_count()), 1), dashboard_port=8266)
flag = SharedStateActor.remote()
squared_list = ray.get([square.remote(arguments, flag) for arguments in sequence_list])
chunk_all = list()
for chunk in squared_list:
chunk_all.append(chunk)
evaluated_keys = [key for chunk in chunk_all for key in chunk]
evaluated_keys.sort()
counter = Counter(evaluated_keys)
repeated_items = {item: count for item, count in counter.items() if count > 1}
print(repeated_items)
if __name__ == "__main__":
main()
I intentionally used time.sleep(0.5) to represents some calculations that can happen there. However, in the final results the following numbers are repeated in the evaluations: key is the item and value is the number of repetition
repeated_items = {10: 2, 11: 2, 12: 2, 13: 2, 14: 2, 25: 2, 26: 2, 27: 2, 28: 2, 29: 2, 60: 4, 61: 4, 62: 4, 63: 4, 64: 4, 65: 4, 66: 4, 67: 4, 68: 4, 69: 4, 79: 2, 90: 3, 91: 3, 92: 3, 93: 3, 94: 3}
Does anybody know how I can deal with this concurrency issue?