I am using Ray Serve to host various machine learning based models. From within the Ray Serve deployment I would like to access the shared object store where I have saved data for ML processing. Specifically I would like to send to the ray serve endpoint the object id. Is this possible? Or have I misunderstood the memory model of ray?
Attached is some basic code that illustrates what I would like to do. Right now the code simply hangs during the ray.get() call within the deployment.
import logging
import ray
from ray import serve
import time
import requests
import pickle
ray.init(configure_logging=True, logging_level=logging.ERROR)
@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
def __init__(self):
print('Initializing')
async def __call__(self, request):
body = await request.body()
print(f"Here's the request: {body}")
obj_id = pickle.loads(body)
print(f"here's the object_id in serve: {obj_id}")
data = await ray.get(obj_id)
print(data)
serve.start()
ModelServer.deploy()
obj_id = ray.put(1)
obj_id_bytes = pickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
url="http://localhost:8000/ModelServer",
data=obj_id_bytes,
timeout=15
)
r = response.content
print(r)
serve.shutdown()
Hi @AustinA, thanks for posting. One note is that you don’t need to await the ray.get() call.
However, it still looks like this is a bug, so I’ll file an issue. Strangely, it seems that this works when the ObjectRef is also in scope in the deployment. E.g. this works if you uncomment the line in __call__:
import logging
import ray
from ray import serve
import time
import requests
import pickle
ray.init(configure_logging=True, logging_level=logging.ERROR)
obj_id = ray.put(1)
@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
def __init__(self):
print('Initializing')
async def __call__(self, request):
body = await request.body()
print(f"Here's the request: {body}")
obj_id_in_deployment = pickle.loads(body)
# print("check equality: ", obj_id == obj_id_in_deployment)
print(f"here's the object_id in serve: {obj_id_in_deployment}")
data = ray.get(obj_id_in_deployment)
print(data)
serve.start()
ModelServer.deploy()
obj_id_bytes = pickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
url="http://localhost:8000/ModelServer",
data=obj_id_bytes,
timeout=15
)
r = response.content
print(r)
serve.shutdown()
However, if you leave the line commented, it does not work.
Could you provide a little more context on your use case? We might be able to find a workaround.
In general we are just trying to get a lot of data to the ML models behind the ray serve endpoints. The thought was we could use the ray object store to avoid copying and sending millions of lines of data over http.
Definitely open to suggestions on other ways to get lots of data to these ray serve endpoints.
I have a working solution to this specific problem! I needed to use ray.cloudpickle not just pickle. This even works when obj_id is defined AFTER the deployment class is defined.
import logging
import ray
from ray import serve
import time
import requests
import pickle
ray.init(configure_logging=True, logging_level=logging.ERROR)
#obj_id = ray.put(1)
@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
def __init__(self):
print('Initializing')
async def __call__(self, request):
body = await request.body()
print(f"Here's the request: {body}")
obj_id_in_deployment = ray.cloudpickle.loads(body)
#print("check equality: ", obj_id == obj_id_in_deployment)
print(f"here's the object_id in serve: {obj_id_in_deployment}")
data = ray.get(obj_id_in_deployment)
print(data)
serve.start()
ModelServer.deploy()
obj_id = ray.put(1)
obj_id_bytes = ray.cloudpickle.dumps(obj_id)
print(f"Here's the object from main: {ray.get(obj_id)}")
print(f"here's the object id from main: {obj_id}")
response = requests.post(
url="http://localhost:8000/ModelServer",
data=obj_id_bytes,
timeout=15
)
r = response.content
print(r)
serve.shutdown()
Definitely still open to suggestions on best practices for sending large amounts of data to a ray serve endpoint.
Hi @AustinA looks like you figured it out quickly before us
In Ray, ObjectRef has its own reducer based on cloudpickle, and generally speaking it’s safer to use cloudpickle instead of pickle for ray specific implementations.
NOTE: It’s generally discouraged to serialize ObjectRef since it will make it very difficult for ray to do ref counting and gc. We don’t have a clean implementation on ray serve level for this yet, but in Q2 we very likely will.
You brought up an interesting topic that we’re trying to make it easier and more structured for our end users that leverage object store to avoid unnecessary serde cost, request data as well as model weight copies. We’ve been drafting something along this thinking as well to make it easier for our user to
Adapt and transfer user request data based on ray plasma – an extreme case could be large batch of images for resnet and ideally we don’t want user to manually serialize, send over http, deserialize on replica for each image used.
Manage larger number of deployments in ray cluster to share the same underlying model weight object without redundant copies. This means for N replicas of the same model, we can have only one copy of its weight in plasma store instead of N in-memory copies, and hot provisioning a new model from ray object store is much faster. For user study like this , the time to load a new model could even be smaller than running a forward pass, which opens up a number of new optimization opportunities.
If you have any concrete examples and use cases that you’re willing to share, we can chat more to learn and get feedback from you to make ray serve better
If serializing the ObjectRef is discouraged, what is a better way to pass an ObjectRef to various Ray deployment endpoints?
Our use case is definitely more in line with use case 1. We are sucking up a lot of data and trying to get it to some machine learning models as quickly and efficiently as possible.
This is lack of support from our side to enable optimizations like this, and the primarily concern is only ref counting & gc. We’re looking into this at both ray and ray serve level and will come up with better solution to support both 1) & 2) soon.
A quick way to unblock yourself would be manual gc … I paired up with @yic for something like
import logging
import ray
from ray import serve
import time
import requests
import pickle
ray.init(configure_logging=True, logging_level=logging.ERROR)
@serve.deployment(route_prefix="/ModelServer")
class ModelServer:
def __init__(self):
print('Initializing')
@ray.remote
class Dummy:
def __init__(self):
self._kv = {}
def ready(self):
pass
def store(self, k, v):
self._kv[k] = v[0] # ref count +1
def delete(self, k):
if k in self._kv:
del self._kv[k] # ref count -1
def load(self, k):
print(k)
return self._kv.get(k)
self._actor = Dummy.options(name="dummy").remote()
ray.get(self._actor.ready.remote())
async def __call__(self, request):
body = await request.body()
print(f"Here's the request: {body}")
data = await self._actor.load.remote(body.decode())
return await data
serve.start()
ModelServer.deploy()
handler = ray.get_actor(name="dummy")
obj_id = ray.put(1, _owner=handler)
obj_id_str = str(obj_id)
handler.store.remote(obj_id_str, [obj_id])
response = requests.post(
url="http://localhost:8000/ModelServer",
data=obj_id_str,
timeout=15
)
r = response.content
print(r)
serve.shutdown()
It’s not super polished but runnable. TL;DR;
Use an actor to manage the lifecycle of objects we put in ray object store. It has unique name that can be referenced.
That actor acts as the “owner” of objects we put in.
Each put increase ref count to your data object by one and delete minus one.
If the Dummy actor died we will lose all stored objects however Sadly I don’t think we have production-ready grade support to take advantage of ray object store yet for this situation.
Sure! And please be mindful about potential GC issues for this pattern. It’s certainly not a well supported feature at Ray level yet. If you have suggestions or asks, we’re more than happy to gather user feedback and requests for it. In fact let me start an RFC for this for community comment …
What error message you’re seeing ? Feel free to open a new post for this.
In the meantime I the discussion context is around ObjectRef serialization and I want to point out the GC issues that we’re trying to resolve. There’s already a much safer way to do it if you don’t want need to serialize and send ObjectRef via HTTP, by directly using serve’s deployment handle: Calling Deployments via HTTP and Python — Ray 2.0.0.dev0 you can pass ObjectRef around using ray python API without GC issues.
It is the same error message as above - TypeError: put() got an unexpected keyword argument ‘_owner’
I tried to get beyond this by letting the actor do the ray put and return the string, so the client code does not have to handle the object id. It seemed to work fine.