Hello there, I am trying to create an abstract class that would wrap the business logic of a ROS2 node in ray context and serve a ROS node. I noticed that, if I do not create a separate thread for spinning the ROS node, ray never gets initialized. I could definitely use your help. I found a work around and this is how it looks like:
import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from std_msgs.msg import String
from typing import Optional
import threading
import ray
from ray import serve
from ray.serve import Application
from fastapi import FastAPI
existing_connection_info = ray.init(address="auto")
print(f"Connection info: {existing_connection_info}")
app = FastAPI()
def deploy(deployment):
class CustomDeployment(deployment):
def __init__(self, *args, **kwargs):
print(
"Creating a Base class that becomes part of the Ray serve Deployment!"
)
rclpy.init(args=None)
super(CustomDeployment, self).__init__(*args, **kwargs)
self.spin_thread = threading.Thread(target=rclpy.spin, args=(self,))
self.spin_thread.start()
def __del__(self):
self.destroy_node()
rclpy.shutdown()
self.spin_thread.join()
return serve.deployment(name="RayWrapperForROSTestApp")(CustomDeployment)
@deploy
@serve.ingress(app)
class TestNoLogicRosWrapper(Node):
def __init__(self):
super().__init__("test_no_logic_ros_wrapper")
self.get_logger().info(f"Wrapper Initialized!")
self.data: Optional[str] = None
self.publisher: Publisher = self.create_publisher(
msg_type=String, topic="test_pub", qos_profile=10
)
self._timer = self.create_timer(
timer_period_sec=3.0, callback=self.pub_random_stuff
)
def pub_random_stuff(self):
if self.data is None:
self.data = "hullu! This is a ROS Node (-:"
self.get_logger().info("Publisher Initialized!")
self.publisher.publish(String(data=self.data))
@app.get("/test_response")
async def get_response(self):
self.get_logger().info("Request Received!")
return "Hello, from the base class!"
app: Application = TestNoLogicRosWrapper.bind()
serve.run(target=app)
These are the logs:
2024-06-10 12:40:49,965 INFO scripts.py:499 -- Running import path: 'test_ros_node:app'.
2024-06-10 12:40:49,987 INFO worker.py:1564 -- Connecting to existing Ray cluster at address: 0.0.0.0:58086...
2024-06-10 12:40:49,992 INFO worker.py:1740 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
Connection info: RayContext(dashboard_url='127.0.0.1:8265', python_version='3.10.12', ray_version='2.23.0', ray_commit='a0947ead5cd94b3d8ca5cdeb9422dccb12d03867')
2024-06-10 12:40:50,016 INFO handle.py:126 -- Created DeploymentHandle 'vtxvet48' for Deployment(name='RayWrapperForROSTestApp', app='default').
2024-06-10 12:40:50,016 INFO handle.py:126 -- Created DeploymentHandle 'gwgb9caj' for Deployment(name='RayWrapperForROSTestApp', app='default').
(ServeController pid=21430) INFO 2024-06-10 12:40:50,065 controller 21430 deployment_state.py:1598 - Deploying new version of Deployment(name='RayWrapperForROSTestApp', app='default') (initial target replicas: 1).
(ServeController pid=21430) INFO 2024-06-10 12:40:50,169 controller 21430 deployment_state.py:1721 - Stopping 1 replicas of Deployment(name='RayWrapperForROSTestApp', app='default') with outdated versions.
(ServeController pid=21430) INFO 2024-06-10 12:40:50,170 controller 21430 deployment_state.py:1844 - Adding 1 replica to Deployment(name='RayWrapperForROSTestApp', app='default').
(ServeReplica:default:RayWrapperForROSTestApp pid=26109) Creating a Base class that becomes part of the Ray serve Deployment!
(ServeReplica:default:RayWrapperForROSTestApp pid=26109) [INFO] [1718023250.943698579] [test_no_logic_ros_wrapper]: Wrapper Initialized!
(ServeController pid=21430) INFO 2024-06-10 12:40:52,248 controller 21430 deployment_state.py:2182 - Replica(id='qiidowm7', deployment='RayWrapperForROSTestApp', app='default') is stopped.
2024-06-10 12:40:53,033 INFO handle.py:126 -- Created DeploymentHandle '3jlcazl4' for Deployment(name='RayWrapperForROSTestApp', app='default').
2024-06-10 12:40:53,033 INFO api.py:584 -- Deployed app 'default' successfully.
2024-06-10 12:40:53,037 INFO handle.py:126 -- Created DeploymentHandle 'om0h9wci' for Deployment(name='RayWrapperForROSTestApp', app='default').
2024-06-10 12:40:53,037 INFO handle.py:126 -- Created DeploymentHandle 'ch5jknvc' for Deployment(name='RayWrapperForROSTestApp', app='default').
(ServeController pid=21430) INFO 2024-06-10 12:40:53,073 controller 21430 deployment_state.py:1598 - Deploying new version of Deployment(name='RayWrapperForROSTestApp', app='default') (initial target replicas: 1).
(ServeController pid=21430) INFO 2024-06-10 12:40:53,180 controller 21430 deployment_state.py:1721 - Stopping 1 replicas of Deployment(name='RayWrapperForROSTestApp', app='default') with outdated versions.
(ServeController pid=21430) INFO 2024-06-10 12:40:53,181 controller 21430 deployment_state.py:1844 - Adding 1 replica to Deployment(name='RayWrapperForROSTestApp', app='default').
(ServeReplica:default:RayWrapperForROSTestApp pid=26109) [INFO] [1718023253.949019763] [test_no_logic_ros_wrapper]: Publisher Initialized!
(ServeController pid=21430) INFO 2024-06-10 12:40:55,255 controller 21430 deployment_state.py:2182 - Replica(id='9xebxpum', deployment='RayWrapperForROSTestApp', app='default') is stopped.
2024-06-10 12:40:56,055 INFO api.py:584 -- Deployed app 'default' successfully.
(ServeReplica:default:RayWrapperForROSTestApp pid=26185) [INFO] [1718023254.279938503] [test_no_logic_ros_wrapper]: Wrapper Initialized!
I noticed that there are always two instances created and one of them is stopped. Is this the behavior? Also I could not find any other way to work with both ROS and ray. Please do suggest if I am doing something wrong here.
Additionally I am unable to start the script through python3, it initializes and dies off without throwing an error, but when I start the script through serve run script_name:app
it seems to work. So I am not sure what is happening here either. Please do suggest, where I am going wrong.
Cheers!