Concurrent parallel tasks with shared state actors issue

Hello everyone.
I have a function which is supposed t run in parallel for different sequence of arguments.
It is supposed to get items inside each sequence and square it. To avoid redundancy, I used a shared_state_actor which genuinely is a dictionary to hold the keys for the arguments that are already evaluated. The whole program is the following:

import ray
import os
import time
from collections import Counter


@ray.remote
class SharedStateActor:
    def __init__(self):
        self.shared_flag = dict()

    def update_shared_dict(self, arg):
        self.shared_flag.update(arg)

    def get_shared_dict(self, arg):
        return self.shared_flag.get(arg, 0)


@ray.remote
def square(arg_list: list, flag: SharedStateActor):
    arg_squared = dict()
    repeated_arguments = list()
    for arg in arg_list:

        if ray.get(flag.get_shared_dict.remote(arg)) != 0:
            repeated_arguments.append(ray.get(flag.get_shared_dict.remote(arg)))
            continue
        else:
            # time.sleep(np.random.uniform(0.8, 1.5))
            time.sleep(0.5)
            flag.update_shared_dict.remote({arg: arg})
            arg_squared.update({arg: arg ** 2})

    return arg_squared

def main():
    sequence_list = [list(range(start, end)) for start, end in [
        (0, 20),
        (5, 30),
        (15, 40),
        (30, 40),
        (40, 60),
        (50, 80),
        (60, 70),
        (60, 80),
        (60, 90),
        (60, 95),
        (70, 96),
        (80, 98),
        (95, 101)
    ]]

    if not ray.is_initialized():
        # Your Ray initialization logic goes here
        ray.init(num_cpus=max(int(1 * os.cpu_count()), 1), dashboard_port=8266)

    flag = SharedStateActor.remote()

    squared_list = ray.get([square.remote(arguments, flag) for arguments in sequence_list])

    chunk_all = list()
    for chunk in squared_list:
        chunk_all.append(chunk)

    evaluated_keys = [key for chunk in chunk_all for key in chunk]
    evaluated_keys.sort()

    counter = Counter(evaluated_keys)

    repeated_items = {item: count for item, count in counter.items() if count > 1}


    print(repeated_items)


if __name__ == "__main__":
    main()

I intentionally used time.sleep(0.5) to represents some calculations that can happen there. However, in the final results the following numbers are repeated in the evaluations: key is the item and value is the number of repetition
repeated_items = {10: 2, 11: 2, 12: 2, 13: 2, 14: 2, 25: 2, 26: 2, 27: 2, 28: 2, 29: 2, 60: 4, 61: 4, 62: 4, 63: 4, 64: 4, 65: 4, 66: 4, 67: 4, 68: 4, 69: 4, 79: 2, 90: 3, 91: 3, 92: 3, 93: 3, 94: 3}
Does anybody know how I can deal with this concurrency issue?

To avoid repetition, you should combine update_shared_dict and get_shared_dict into a single get_or_update_shard_dict function so that it’s atomic.