Ray Serve Sharing Objects with Deployment

I am using Ray Serve to host various machine learning based models. From within the Ray Serve deployment I would like to access the shared object store where I have saved data for ML processing. Specifically I would like to send to the ray serve endpoint the object id. Is this possible? Or have I misunderstood the memory model of ray?

Attached is some basic code that illustrates what I would like to do. Right now the code simply hangs during the ray.get() call within the deployment.

import logging
import ray
from ray import serve
import time
import requests
import pickle

ray.init(configure_logging=True, logging_level=logging.ERROR)

@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
    def __init__(self):
        print('Initializing')

    async def __call__(self, request):
        body = await request.body()
        print(f"Here's the request: {body}")
        obj_id = pickle.loads(body)
        print(f"here's the object_id in serve: {obj_id}")
        data = await ray.get(obj_id)
        print(data)

serve.start()
ModelServer.deploy()

obj_id = ray.put(1)
obj_id_bytes = pickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
    url="http://localhost:8000/ModelServer",
    data=obj_id_bytes,
    timeout=15
)
r = response.content
print(r)
serve.shutdown()

Hi @AustinA, thanks for posting. One note is that you donā€™t need to await the ray.get() call.

However, it still looks like this is a bug, so Iā€™ll file an issue. Strangely, it seems that this works when the ObjectRef is also in scope in the deployment. E.g. this works if you uncomment the line in __call__:

import logging
import ray
from ray import serve
import time
import requests
import pickle

ray.init(configure_logging=True, logging_level=logging.ERROR)

obj_id = ray.put(1)
@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
    def __init__(self):
        print('Initializing')

    async def __call__(self, request):
        body = await request.body()
        print(f"Here's the request: {body}")
        obj_id_in_deployment = pickle.loads(body)
        # print("check equality: ", obj_id == obj_id_in_deployment)
        print(f"here's the object_id in serve: {obj_id_in_deployment}")
        data = ray.get(obj_id_in_deployment)
        print(data)

serve.start()
ModelServer.deploy()

obj_id_bytes = pickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
    url="http://localhost:8000/ModelServer",
    data=obj_id_bytes,
    timeout=15
)
r = response.content
print(r)
serve.shutdown()

However, if you leave the line commented, it does not work.

Could you provide a little more context on your use case? We might be able to find a workaround.

Thank you very much for looking into this.

In general we are just trying to get a lot of data to the ML models behind the ray serve endpoints. The thought was we could use the ray object store to avoid copying and sending millions of lines of data over http.

Definitely open to suggestions on other ways to get lots of data to these ray serve endpoints.

I have a working solution to this specific problem! I needed to use ray.cloudpickle not just pickle. This even works when obj_id is defined AFTER the deployment class is defined.

import logging
import ray
from ray import serve
import time
import requests
import pickle

ray.init(configure_logging=True, logging_level=logging.ERROR)
#obj_id = ray.put(1)

@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
    def __init__(self):
        print('Initializing')

    async def __call__(self, request):
        body = await request.body()
        print(f"Here's the request: {body}")
        obj_id_in_deployment = ray.cloudpickle.loads(body)
        #print("check equality: ", obj_id == obj_id_in_deployment)
        print(f"here's the object_id in serve: {obj_id_in_deployment}")
        data = ray.get(obj_id_in_deployment)
        print(data)

serve.start()
ModelServer.deploy()

obj_id = ray.put(1)
obj_id_bytes = ray.cloudpickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
    url="http://localhost:8000/ModelServer",
    data=obj_id_bytes,
    timeout=15
)
r = response.content
print(r)
serve.shutdown()

Definitely still open to suggestions on best practices for sending large amounts of data to a ray serve endpoint.

1 Like

Hi @AustinA looks like you figured it out quickly before us :wink:

In Ray, ObjectRef has its own reducer based on cloudpickle, and generally speaking itā€™s safer to use cloudpickle instead of pickle for ray specific implementations.

NOTE: Itā€™s generally discouraged to serialize ObjectRef since it will make it very difficult for ray to do ref counting and gc. We donā€™t have a clean implementation on ray serve level for this yet, but in Q2 we very likely will.

You brought up an interesting topic that weā€™re trying to make it easier and more structured for our end users that leverage object store to avoid unnecessary serde cost, request data as well as model weight copies. Weā€™ve been drafting something along this thinking as well to make it easier for our user to

  1. Adapt and transfer user request data based on ray plasma ā€“ an extreme case could be large batch of images for resnet and ideally we donā€™t want user to manually serialize, send over http, deserialize on replica for each image used.

  2. Manage larger number of deployments in ray cluster to share the same underlying model weight object without redundant copies. This means for N replicas of the same model, we can have only one copy of its weight in plasma store instead of N in-memory copies, and hot provisioning a new model from ray object store is much faster. For user study like this , the time to load a new model could even be smaller than running a forward pass, which opens up a number of new optimization opportunities.

If you have any concrete examples and use cases that youā€™re willing to share, we can chat more to learn and get feedback from you to make ray serve better :slight_smile:

If serializing the ObjectRef is discouraged, what is a better way to pass an ObjectRef to various Ray deployment endpoints?

Our use case is definitely more in line with use case 1. We are sucking up a lot of data and trying to get it to some machine learning models as quickly and efficiently as possible.

This is lack of support from our side to enable optimizations like this, and the primarily concern is only ref counting & gc. Weā€™re looking into this at both ray and ray serve level and will come up with better solution to support both 1) & 2) soon.

A quick way to unblock yourself would be manual gc ā€¦ I paired up with @yic for something like

import logging
import ray
from ray import serve
import time
import requests
import pickle

ray.init(configure_logging=True, logging_level=logging.ERROR)


@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
    def __init__(self):
        print('Initializing')
        @ray.remote
        class Dummy:
            def __init__(self):
                self._kv = {}

            def ready(self):
                pass

            def store(self, k, v):
                self._kv[k] = v[0] # ref count +1 

            def delete(self, k):
                if k in self._kv:
                   del self._kv[k] # ref count -1

            def load(self, k):
                print(k)
                return self._kv.get(k)

        self._actor = Dummy.options(name="dummy").remote()
        ray.get(self._actor.ready.remote())

    async def __call__(self, request):
        body = await request.body()
        print(f"Here's the request: {body}")
        data = await self._actor.load.remote(body.decode())
        return await data

serve.start()
ModelServer.deploy()
handler = ray.get_actor(name="dummy")

obj_id = ray.put(1, _owner=handler)
obj_id_str = str(obj_id)
handler.store.remote(obj_id_str, [obj_id])

response = requests.post(
    url="http://localhost:8000/ModelServer",
    data=obj_id_str,
    timeout=15
)
r = response.content
print(r)
serve.shutdown()

Itā€™s not super polished but runnable. TL;DR;

  1. Use an actor to manage the lifecycle of objects we put in ray object store. It has unique name that can be referenced.
  2. That actor acts as the ā€œownerā€ of objects we put in.
  3. Each put increase ref count to your data object by one and delete minus one.

If the Dummy actor died we will lose all stored objects however :confused: Sadly I donā€™t think we have production-ready grade support to take advantage of ray object store yet for this situation.

Thank you jiaodong for the example. While trying it out, I get an error from the put line:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_1632/2486733852.py in <module>
     15 print(f"handler is: {handler}")
     16 opts = {'_owner': handler}
---> 17 obj_id = ray.put("fpp",  **opts) #
     18 obj_id_str = str(obj_id)
     19 handler.store.remote(obj_id_str, [obj_id])

~/anaconda3/lib/python3.8/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
    102             # we only convert init function if RAY_CLIENT_MODE=1
    103             if func.__name__ != "init" or is_client_mode_enabled_by_default:
--> 104                 return getattr(ray, func.__name__)(*args, **kwargs)
    105         return func(*args, **kwargs)
    106 

~/anaconda3/lib/python3.8/site-packages/ray/util/client/api.py in put(self, *args, **kwargs)
     50             kwargs: opaque keyword arguments
     51         """
---> 52         return self.worker.put(*args, **kwargs)
     53 
     54     def wait(self, *args, **kwargs):

TypeError: put() got an unexpected keyword argument '_owner'

I am using ray 1.8.0 and the doc here for ray 1.8.0 claims put has the owner argā€¦

Our docs for 1.8 might not be up-to-date for that part :confused: I tried the code sample above on master worker.py - ray-project/ray - Sourcegraph

Could you try to reproduce on 1.9 or 1.10 ?

Thank you jiaodong,
I tried 1.10.0 in a test and it seemed to honor the _owner. Iā€™ll upgrade the system and see if I have success.

BTW, I am trying this in a cluster setup - should that work there or are there pitfalls I might expect?

Sure! And please be mindful about potential GC issues for this pattern. Itā€™s certainly not a well supported feature at Ray level yet. If you have suggestions or asks, weā€™re more than happy to gather user feedback and requests for it. In fact let me start an RFC for this for community comment ā€¦

In 1.8.0 I see the error on the local machine running a local a local ray.

Same as on clustered ray on kubernetes cluster.

in 1.10.0 on local machine, put(val, _owner=None) accepts there is such a thing as _owner

After upgrading cluster to 1.10.0 - the error is still there.

What error message youā€™re seeing ? Feel free to open a new post for this.

In the meantime I the discussion context is around ObjectRef serialization and I want to point out the GC issues that weā€™re trying to resolve. Thereā€™s already a much safer way to do it if you donā€™t want need to serialize and send ObjectRef via HTTP, by directly using serveā€™s deployment handle: Calling Deployments via HTTP and Python ā€” Ray 2.0.0.dev0 you can pass ObjectRef around using ray python API without GC issues.

It is the same error message as above - TypeError: put() got an unexpected keyword argument ā€˜_ownerā€™

I tried to get beyond this by letting the actor do the ray put and return the string, so the client code does not have to handle the object id. It seemed to work fine.