On `task:deserealization` and effective usage of Plasma Store


While examining of how my ray computations work using ray.timeline I found out that unexpectedly long deserialization takes place.

The prelude is as follows.

There is a remote function f that yields output which then goes to function g. Function f yields dictionary like as follows:

class FOutoput(TypedDict):
    key1: np.ndarray  # Big array.
    key2: List[float]
    key3: Optional[List[float]]
    key4: bool
    key5: "CustomClass"

On a timeline I see that inside f there happened task:store_outputs and inside g

Following this and this I hesitated that ray can properly handle NumPy array that lies in a dictionary with custom classes or optional lists. So I implemented encoder and decor from FOutput to RayFriendlyFOutput which is like this:

class RayFriendlyFOutoput(TypedDict):
    key1: np.ndarray  # Big array.
    key2: np.ndarray
    key3: np.ndarray
    key4: bool
    key5: Dict[str, np.ndarray]

So now f returns RayFriendlyFOutoput, g gets it as well, but internally there happens a conversion RayFriendlyFOutoput to FOutoput. Deserealization time remained the same.

I checked that all arrays had flag writable equal to False which may indicate that .

My questions are:

  • Does task:deserialize_args always mean that arguments was not plasma stored properly?
  • What may be the reason that RayFriendlyFOutoput was deserialized?
  • Is that right that in case of FOutput key1 value cannot be stored to Plasma store as not all values are primitive or NumPy arrays?
  • How is it possible to establish the reason of deserialization?

It also may be worth mentioning that in function g key1 value is copied to perform torch.interpolation.

  • This copy is counted in task:deserialize_args or in task:execute?

Before answering questions, let me go through how the zero-copy works. After that, if you are still confused about the behavior, you can follow up here.

Basically, all “zero-copyable objects” need serialization and deserialization.

Internally, this is how it works.

  • If you call ray.put, the object is serialized using pickle 5 protocol.
  • Serialized object is stored in shared memory store (object store)
  • When other processes access it, it deserializes the object. When this happens, Ray only copies “metadata” of numpy array (not the buffer). It can directly use the buffer stored in the shared memory. Thus most of copy won’t happen. you can get more information from PEP 574 -- Pickle protocol 5 with out-of-band data | Python.org (this is how this works)

So this means you still need serialization + deserialization although plasma store is used.

Also worth noting that when you have numpy within dict, only the numpy buffer is zero-copy. And python dict values are copied to the process. You can store any type in plasma store, but they are just not zero-copied (the memory is copied to the process, it the object doesn’t support zero copy).

Lastly, when your output is less than 100KB, Ray is using some optimization that it didn’t go through plasma store.