Best way to share memory for Ray tasks?

I’m using a caching technique in one of my ray functions and I’d like the caching dictionary to be shared between all ray workers instead of updating it after the function is completed.

I know I can use a ray actor to share the memory but if I check the cache wouldn’t it block?

An example of what I want to accomplish is this:
where cached is a regular dict or LRUdict

cached = dict() # or LRUDict (class obj)
@ray.remote
def func(*args):
    # preprocess
    unknown_x = []
    known_x = []
    for index, val in enumerate(x):
        key = '|'.join([str(x) for x in val.values()])
        if key in cached:
            known_x.append(cached[key])
        else:
            known_x.append(np.inf)
            unknown_x.append(val)

    unknown_evaluated = predictor.predict(unknown_x)

    unknown_index = 0
    for index in range(len(x)):
        if(known_x[index] == np.inf):
            known_x[index] = unknown_evaluated[unknown_index]
            key = '|'.join([str(i) for i in x[index].values()])
            if not(key in cached):
                cached[key] = known_x[index]
            unknown_index = unknown_index+1
     # finish work
     return x

I know I can do something like this

cached = LRUDict.remote()
def func(*args):
    # preprocess
    unknown_x = []
    known_x = []
    for index, val in enumerate(x):
        key = '|'.join([str(x) for x in val.values()])
        if ray.get(cached.get.remote(key)) is not None:
            known_x.append(ray.get(cached.get.remote(key)))
        else:
            known_x.append(np.inf)
            unknown_x.append(val)

    unknown_evaluated = predictor.predict(unknown_x)

    unknown_index = 0
    for index in range(len(x)):
        if known_x[index] is None:
            known_x[index] = unknown_evaluated[index]
            key = '|'.join([str(i) for i in x[index].values()])
            if ray.get(cached.get.remote(key)) is None:
                cached.__setitem__.remote(key,known_x[index])
            unknown_index = unknown_index+1
    # finish work
    return x

however I guess the actor will block since it has to check the cache for the key in each iteration.

What’s the best approach for a problem like this?

Unfortunately, this is not possible, especially when your cache is mutable (like your example).

  1. Python object like dictionary is not usable in the shared memory. This is the same even when you use shared memory abstraction in multi processing (mostly due to some complexity in python reference counting). So, it is (at least I am not aware of the way to do this) not possible to have shared python dictionary among different processes (Ray workers).
  2. Also, having mutable objects in shared memory is extremely complex, especially when you have multi nodes (e.g., whenever you write on it, the cluster should sync the data cross the machines and you need to lock all objects in the cluster at the same time). For this reason, Ray doesn’t support mutable shared object.

So in this case, actor is the right solution. As you said actor model can have low throughput because it is operated by message passing (meaning it can process single message at a time). If updating dict through the actor is the bottleneck I’d recommend you to use in-memory cache like Redis to achieve the same thing.

Hi @sangcho thank you for the reply.
My previous solution to this before using ray was actually using redis as you suggested through the dogpile.cache package and it was very efficient. However with ray, I am not able to do the same thing because the caching object is not serializable. Do you know of any alternative ways to use redis within a ray task?

can you tell me a bit more about what it means by “caching object is not serializable”? I feel like there should be a way to get around that

For example when I use dogpile.cache to act as a redis client
in the above example replace cached with this object

cached = make_region().configure(
    'dogpile.cache.redis',
    # expiration_time=6000,
    arguments={
        # 'distributed_lock': True,
        'thread_local_lock': False,
        'should_cache_fn': dont_cache_none,
        'socket_keepalive':True,
        # 'socket_timeout':300,
        'health_check_interval':60
    }
)

I will run into this error TypeError: cannot pickle '_thread.lock' object

So in this case, actor is the right solution. As you said actor model can have low throughput because it is operated by message passing (meaning it can process single message at a time).

An actor can process many thousands of requests per second. If the objects being cached are relatively large, this can work well. How many lookups per second do you need for the cache?

One optimization I can see looking at the code above, is to use ray.wait() on the cache actor requests. This avoids blocking readers on synchronous get latency, which is probably the main overhead.