@ray.remote function seemingly copying data from plasma store

Hello,

I am currently trying to reduce the memory of a program that takes one large numpy array and one large dictionary of pandas dataframes, and iterates over their values using different model parameters to create some score/result. I would like to share these two objects across processes without copying. After configuring my model function to be remote, and passing references to the large numpy/pandas objects and running things with the mprof memory profiler, I am finding that memory overhead for the program is the same as running with ProcessPool, where these objects are piped/copied to each process. What am I missing to get to zero-copy? (Possibly related thread: How to share memory with non-numpy object?)

The pattern looks like this:

@ray.remote
def run_model(array, dict_of_pandas_df, params):
     ... do some work ...
     return result

numpy_array_ref = ray.put(my_array)
dict_of_pandas_df_ref = ray.put(dict_of_pandas_df)
list_of_model_parameters = [ {}, {}...]
result_refs = [run_model.remote(numpy_array_ref, dict_of_pandas_df_ref, x) for x in list_of_model_parameters]

results = ray.get(result_refs)

When you do zero-copy to your process, note that the process still says it uses the memory (from the shared memory). It means your memory is double counted. For example, if you have 2 processes A and B, each of which uses 100MB of shared memory, both of them will say they use 100MB of memory.

cc @suquark do you know any good way to verify the zero copy read?

Thanks a lot @sangcho . If it’s useful, I am profiling with the following mprofile command:

mprof run --include-children python my_script.py

Even when looking at the running processes in top/htop, I believe that the RES portion of memory is significantly larger than SHR – both when running multiprocessing as well as with @ray.remote (will follow up to verify on this)

Hmm I see. What’s the dtype of your numpy. Are they float or integer or strings?

I am passing in a numpy array with dtype float. The pandas dataframes – values of the dict – have mixed types: float/pandas.Timestamp.

Hmm that’s pretty weird. Pandas dataframe uses the numpy array under the hood afaik, and float type numpy should be zero-copy read. So each process only should copy parts that are not numpy array which means your SHR should be larger than RES)!

Can you actually try sth like this?

@ray.remote
def your_func(array_ref_list):
    # Measure the overhead of ray.get
    s = time.perf_counter()
    ray.get(array_ref_list)
    print(time.perf_counter() - s)

numpy_array_ref = ray.put(my_array)
# NOTE: Pass the list of object ref so that it won't be automatically obtained.
[your_func([numpy_array_ref]) for _ in range(10)]

If the overhead is as big as copying large array into the process, that means zero copy read wasn’t working as expected.

Actually there’s also a possibility pandas.Timestamp. is not zero-copyable.

Awesome @sangcho. I’ll try what you’ve put forward – might take 24 hours or so to get back on this thread. Thanks!

Hey @sangcho. After looking into things, the latency of a ray.get with a function is not very large – only a few seconds – for an object of ~50 GB. Need to do some more research, but hoping to post back on this thread with a fully reproducible example if this rears its head again – thank you so much for your help!

1 Like

Sounds good! I still have suspicion that the issue is you have Timestamp dtype btw (afaik, we only support zero-copy read for integer & float).