Synchronize ray.remote with a global state actor

I have 2 parallel ray.remote tasks with a global state. The first one sets a new key-value pair in the global state while the second one retrieves this value. Currently, I was only able to do this using a While True loop and constantly checking if the key exists yet. I was wondering if there is a better method to achieve this.

Thanks a lot

import ray
import time

ray.init(num_cpus=2)

@ray.remote
class GlobalState:
    def __init__(self):
        self.z = {}

    def set_z(self, key, new_val):
        self.z[key] = new_val

    def get_z(self, key):
        return self.z.get(key, None)


@ray.remote
def task1(global_state):
    while True:
        new_val = ray.get(global_state.get_z.remote("key"))
        if new_val is not None:
            break
    
    return new_val

@ray.remote
def task2(global_state):
    time.sleep(3)
    ray.get(global_state.set_z.remote("key", "value"))
    return 0
    
global_state = GlobalState.remote()

task1 = task1.remote(global_state)
task2 = task2.remote(global_state)

completed_tasks = ray.get([task1, task2])

ray.shutdown()

I think it is the best way for now. You can also implement pub/sub pattern (task1: publisher, actor: coordinator/queue, task2: subscriber), but you may need some code to handle failures better in that case (e.g., if the coordinator fails, there’s no way for task 2 to know) + also ray doesn’t natively support pubsub.

Thanks for your answer. My final goal is to have many agents that compute certain values, then “communicate” via the global actor and then compute again based on the new communicated values (distributed ADMM among many agents). This means that task1 and task2 (and possibly many more tasks) both would compute some values and store it in the shared dict before retrieving the neighbour’s computed values. The while True loop works but it is a bit slow. Additionally, it introduces many tasks, as I can see in ray dashboard and a lot of them will be considered as “Unaccounted tasks”. So it would be nice if some communication among agents/tasks or similar would be available.

Here is some code that outlines the idea. Here I wrote it without ray in a sequential manner. The goal is to do these computations in parallel for each agent:

agents = ["agent1", "agent2", "agent3", "agent_4"] 

# ADMM rounds
iterations = 10

# set initial values
values = {(0, agent): 0 for agent in agents}

for it in range(iterations):
    for agent in agents:
        # retrieve values from neighbours
        prev_values = {agent: values[(it, agent)] for agent in agents}
        
        # do some computations by agent 
        new_val = some_computations(agent, prev_values)
        
        # store it in a global state
        values[(it + 1, agent)] = new_val