@ray.remote function seemingly copying data from plasma store


I am currently trying to reduce the memory of a program that takes one large numpy array and one large dictionary of pandas dataframes, and iterates over their values using different model parameters to create some score/result. I would like to share these two objects across processes without copying. After configuring my model function to be remote, and passing references to the large numpy/pandas objects and running things with the mprof memory profiler, I am finding that memory overhead for the program is the same as running with ProcessPool, where these objects are piped/copied to each process. What am I missing to get to zero-copy? (Possibly related thread: How to share memory with non-numpy object?)

The pattern looks like this:

def run_model(array, dict_of_pandas_df, params):
     ... do some work ...
     return result

numpy_array_ref = ray.put(my_array)
dict_of_pandas_df_ref = ray.put(dict_of_pandas_df)
list_of_model_parameters = [ {}, {}...]
result_refs = [run_model.remote(numpy_array_ref, dict_of_pandas_df_ref, x) for x in list_of_model_parameters]

results = ray.get(result_refs)

When you do zero-copy to your process, note that the process still says it uses the memory (from the shared memory). It means your memory is double counted. For example, if you have 2 processes A and B, each of which uses 100MB of shared memory, both of them will say they use 100MB of memory.

cc @suquark do you know any good way to verify the zero copy read?

Thanks a lot @sangcho . If it’s useful, I am profiling with the following mprofile command:

mprof run --include-children python my_script.py

Even when looking at the running processes in top/htop, I believe that the RES portion of memory is significantly larger than SHR – both when running multiprocessing as well as with @ray.remote (will follow up to verify on this)

Hmm I see. What’s the dtype of your numpy. Are they float or integer or strings?

I am passing in a numpy array with dtype float. The pandas dataframes – values of the dict – have mixed types: float/pandas.Timestamp.

Hmm that’s pretty weird. Pandas dataframe uses the numpy array under the hood afaik, and float type numpy should be zero-copy read. So each process only should copy parts that are not numpy array which means your SHR should be larger than RES)!

Can you actually try sth like this?

def your_func(array_ref_list):
    # Measure the overhead of ray.get
    s = time.perf_counter()
    print(time.perf_counter() - s)

numpy_array_ref = ray.put(my_array)
# NOTE: Pass the list of object ref so that it won't be automatically obtained.
[your_func([numpy_array_ref]) for _ in range(10)]

If the overhead is as big as copying large array into the process, that means zero copy read wasn’t working as expected.

Actually there’s also a possibility pandas.Timestamp. is not zero-copyable.

Awesome @sangcho. I’ll try what you’ve put forward – might take 24 hours or so to get back on this thread. Thanks!

Hey @sangcho. After looking into things, the latency of a ray.get with a function is not very large – only a few seconds – for an object of ~50 GB. Need to do some more research, but hoping to post back on this thread with a fully reproducible example if this rears its head again – thank you so much for your help!

1 Like

Sounds good! I still have suspicion that the issue is you have Timestamp dtype btw (afaik, we only support zero-copy read for integer & float).