Using ray.put for LARGE numpy arrays

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I have two large numpy arrays of numpy arrays (more than 100M rows for each) which I want to pass into a remote task function. I am calling the function repeatedly, so I was wanting to store the array in the object store using ray.put and pass the reference. However, I ran into the following error:

ValueError: Total buffer metadata size is bigger than 2147483647. Consider reduce the number of buffers (number of numpy arrays, etc).

on the first ray.put call. I then tried to use np.memmap on the arrays and passed a reference to those mmaps, but that also resulted in the same error.

I assumed the issue wasn’t the amount of memory in the object store, but more how much can be added at a time. Are there any suggestions for how to load such large datasets for reusability? Outside of just dividing it into 2GB segments and passing potentially 100s of smaller numpy array refs to the remote task.

I am using Ray 2.5.1 and NumPy 1.23.4.

Thanks in advance!

Looking into the code a bit, it seems that the message is a direct result of ProtoBuf’s max serialized message size of 2 GB which my arrays seem to be hitting with regard to their metadata. Is there a place I can find more information about what’s being stored in the metadata and maybe recommended suggestions for resolving this issue? Thank!

can you share a small repro script? Thanks.

Hey @Ruiyang_Wang, thanks for the reply. I am in the process of coming up with a repro script. With the scale of the numpy arrays, it takes a while to create them and then use ray.put. Do you have any insight into why the metadata size might be too large? The line of code where it’s throwing the error is serialization.pxi - ray-project/ray - Sourcegraph. In most of my attempts at a repro script, either the ray.put succeeds or fails due to OOM issues.

Thanks again!

Hey @Ruiyang_Wang, the following is a repro script that resulted in the above error:

import numpy as np
import ray
import os
import time
import random


def ray_put(arr: np.ndarray):
    print(f"Size in bytes: {arr.nbytes}")
    print("Putting in object store...")
    arr_ref = ray.put(arr)
    print("Placed in object store!")
    return arr_ref

def main() -> None:
    if ray.is_initialized() == False:
        if "redis_password" in os.environ:
            ray.init(
                address="auto",
                _redis_password=os.environ["redis_password"],
                include_dashboard=True,
            )
        else:
            ray.init(include_dashboard=True)
    print("Ray Available Resources: {}".format(ray.available_resources()))
    
    
    arrs = [np.array([np.random.choice(150000000, random.randint(50, 100)) for _ in range(150000000)], dtype=object) for _ in range(3)]
    
    ray_put(arrs[0]) # Error occurs here
    ray_put(arrs[1])
    ray_put(arrs[2])
    
    ray.shutdown()    

if __name__ == "__main__":
    main()

Traceback:

Traceback (most recent call last):
  File "/users/PAS2065/adityatv/repro.py", line 39, in <module>
    
  File "/users/PAS2065/adityatv/repro.py", line 30, in main
    arrs = [
  File "/users/PAS2065/adityatv/repro.py", line 11, in ray_put
    print("Putting in object store...")
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 18, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/worker.py", line 2612, in put
    object_ref = worker.put_object(value, owner_address=serialize_owner_address)
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/worker.py", line 693, in put_object
    serialized_value = self.get_serialization_context().serialize(value)
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/serialization.py", line 466, in serialize
    return self._serialize_to_msgpack(value)
  File "/users/PAS2065/adityatv/miniconda3/envs/myenv/lib/python3.9/site-packages/ray/_private/serialization.py", line 450, in _serialize_to_msgpack
    return MessagePackSerializedObject(
  File "python/ray/includes/serialization.pxi", line 463, in ray._raylet.MessagePackSerializedObject.__init__
  File "python/ray/includes/serialization.pxi", line 437, in ray._raylet.Pickle5SerializedObject.total_bytes.__get__
  File "python/ray/includes/serialization.pxi", line 337, in ray._raylet.Pickle5Writer.get_total_bytes
ValueError: Total buffer metadata size is bigger than 2147483647. Consider reduce the number of buffers (number of numpy arrays, etc).

I ran this (and my original scripts) on a slurm node with 1024 GB allocated for the object store memory (total memory is 3000+ GB.) Thanks in advance!

Is there a specific reason we use numpy array of array, instead of a 2 dimension array?

Thanks for the reply! The reason for the nested arrays is that each of the arrays are of different sizes.

I could try padding my arrays to be of the same size if that’ll solve the issue.

Ray can easily handle numpy arrays holding large data, but not large number of arrays because a ray object metadata tracks all python objects it references. If ndarray with padding can work that may solve the issue.

2 Likes

That makes a lot of sense! I’ll try it out later today and report back. Thanks!

No problem. You can also check Ray Data: Scalable Datasets for ML — Ray 2.6.1 if it applies. We provide handy chunking functions for large data sets.

Hey @Ruiyang_Wang, I went ahead and tried to use the 2D array, but unfortunately, the size of the 2D array is too large to even create (approximately ~6 TiB using int32 as the dtype). Do you happen to have any other suggestions for dealing with this kind of data in Ray. Just to give a bit of context, the purpose of the nested numpy arrays is to represent an adjacency list of a large graph for which I’d like to run random walks.

I’ve looked at the Ray Dataset and think that could be useful if there was a way to index a dataset by blocks and also choose elements from a block, but I wasn’t able to find such methods.

I went ahead and marked the solution which I think would be helpful for others. My particular use case requires a bit more manipulation, but your suggestion would fix the original issue.

@chengsu would you mind helping this one on how to load big data?