Hello,
I am currently trying to reduce the memory of a program that takes one large numpy array and one large dictionary of pandas dataframes, and iterates over their values using different model parameters to create some score/result. I would like to share these two objects across processes without copying. After configuring my model function to be remote, and passing references to the large numpy/pandas objects and running things with the mprof memory profiler, I am finding that memory overhead for the program is the same as running with ProcessPool, where these objects are piped/copied to each process. What am I missing to get to zero-copy? (Possibly related thread: How to share memory with non-numpy object?)
The pattern looks like this:
@ray.remote
def run_model(array, dict_of_pandas_df, params):
... do some work ...
return result
numpy_array_ref = ray.put(my_array)
dict_of_pandas_df_ref = ray.put(dict_of_pandas_df)
list_of_model_parameters = [ {}, {}...]
result_refs = [run_model.remote(numpy_array_ref, dict_of_pandas_df_ref, x) for x in list_of_model_parameters]
results = ray.get(result_refs)
When you do zero-copy to your process, note that the process still says it uses the memory (from the shared memory). It means your memory is double counted. For example, if you have 2 processes A and B, each of which uses 100MB of shared memory, both of them will say they use 100MB of memory.
cc @suquark do you know any good way to verify the zero copy read?
Thanks a lot @sangcho . If it’s useful, I am profiling with the following mprofile command:
mprof run --include-children python my_script.py
Even when looking at the running processes in top
/htop
, I believe that the RES portion of memory is significantly larger than SHR – both when running multiprocessing as well as with @ray.remote
(will follow up to verify on this)
Hmm I see. What’s the dtype of your numpy. Are they float or integer or strings?
I am passing in a numpy array with dtype float. The pandas dataframes – values of the dict – have mixed types: float/pandas.Timestamp.
Hmm that’s pretty weird. Pandas dataframe uses the numpy array under the hood afaik, and float type numpy should be zero-copy read. So each process only should copy parts that are not numpy array which means your SHR should be larger than RES)!
Can you actually try sth like this?
@ray.remote
def your_func(array_ref_list):
# Measure the overhead of ray.get
s = time.perf_counter()
ray.get(array_ref_list)
print(time.perf_counter() - s)
numpy_array_ref = ray.put(my_array)
# NOTE: Pass the list of object ref so that it won't be automatically obtained.
[your_func([numpy_array_ref]) for _ in range(10)]
If the overhead is as big as copying large array into the process, that means zero copy read wasn’t working as expected.
Actually there’s also a possibility pandas.Timestamp.
is not zero-copyable.
Awesome @sangcho. I’ll try what you’ve put forward – might take 24 hours or so to get back on this thread. Thanks!
Hey @sangcho. After looking into things, the latency of a ray.get
with a function is not very large – only a few seconds – for an object of ~50 GB. Need to do some more research, but hoping to post back on this thread with a fully reproducible example if this rears its head again – thank you so much for your help!
1 Like
Sounds good! I still have suspicion that the issue is you have Timestamp dtype btw (afaik, we only support zero-copy read for integer & float).