diff --git a/README.md b/README.md index eff7747..19d15ed 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,9 @@ You will use these packages to launch all rover-side ROS2 nodes. - [Connecting the GuliKit Controller](#connecting-the-gulikit-controller) - [Common Problems/Toubleshooting](#common-problemstroubleshooting) - [Packages](#packages) +- [Graphs](#graphs) + - [Full System](#full-system) + - [Individual Nodes](#individual-nodes) - [Maintainers](#maintainers) ## Software Prerequisites @@ -140,6 +143,40 @@ A: To find a microcontroller to talk to, Anchor sends a ping to every Serial por - [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`). - [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK. +## Graphs + +### Full System + +> **Anchor stand-alone** (`ros2 launch anchor_pkg rover.launch.py`) +> +> ![rqt_graph of Anchor by itself, ran with command: ros2 launch anchor_pkg rover.launch.py](./docs-resources/graph-anchor-standalone.png) + +> **Anchor with [basestation-classic](https://github.com/SHC-ASTRA/basestation-classic)** +> +> ![rqt_graph of Anchor ran with the same command as above, talking to basestation-classic](./docs-resources/graph-anchor-w-basestation-classic.png) + +> **Anchor with Headless** (`ros2 run headless_pkg headless_full`) +> +> ![rqt_graph of Anchor ran with Headless](./docs-resources/graph-anchor-w-headless.png) + +### Individual Nodes + +> **Anchor** (`ros2 run anchor_pkg anchor`) +> +> ![rqt_graph of Anchor node running by itself](./docs-resources/graph-anchor-anchor-standalone.png) + +> **Core** (`ros2 run core_pkg core --ros-args -p launch_mode:=anchor`) +> +> ![rqt_graph of Core node running by itself](./docs-resources/graph-anchor-core-standalone.png) + +> **Arm** (`ros2 run arm_pkg arm --ros-args -p launch_mode:=anchor`) +> +> ![rqt_graph of Arm node running by itself](./docs-resources/graph-anchor-arm-standalone.png) + +> **Bio** (`ros2 run bio_pkg bio --ros-args -p launch_mode:=anchor`) +> +> ![rqt_graph of Bio node running by itself](./docs-resources/graph-anchor-bio-standalone.png) + ## Maintainers | Name | Email | Discord | diff --git a/docs-resources/graph-anchor-anchor-standalone.png b/docs-resources/graph-anchor-anchor-standalone.png new file mode 100644 index 0000000..55d4938 Binary files /dev/null and b/docs-resources/graph-anchor-anchor-standalone.png differ diff --git a/docs-resources/graph-anchor-arm-standalone.png b/docs-resources/graph-anchor-arm-standalone.png new file mode 100644 index 0000000..57a6e5f Binary files /dev/null and b/docs-resources/graph-anchor-arm-standalone.png differ diff --git a/docs-resources/graph-anchor-bio-standalone.png b/docs-resources/graph-anchor-bio-standalone.png new file mode 100644 index 0000000..6e7e4bf Binary files /dev/null and b/docs-resources/graph-anchor-bio-standalone.png differ diff --git a/docs-resources/graph-anchor-core-standalone.png b/docs-resources/graph-anchor-core-standalone.png new file mode 100644 index 0000000..d93066e Binary files /dev/null and b/docs-resources/graph-anchor-core-standalone.png differ diff --git a/docs-resources/graph-anchor-standalone.png b/docs-resources/graph-anchor-standalone.png new file mode 100644 index 0000000..8425ea9 Binary files /dev/null and b/docs-resources/graph-anchor-standalone.png differ diff --git a/docs-resources/graph-anchor-w-basestation-classic.png b/docs-resources/graph-anchor-w-basestation-classic.png new file mode 100644 index 0000000..6c7e382 Binary files /dev/null and b/docs-resources/graph-anchor-w-basestation-classic.png differ diff --git a/docs-resources/graph-anchor-w-headless.png b/docs-resources/graph-anchor-w-headless.png new file mode 100644 index 0000000..23b1b2a Binary files /dev/null and b/docs-resources/graph-anchor-w-headless.png differ diff --git a/src/anchor_pkg/launch/rover.launch.py b/src/anchor_pkg/launch/rover.launch.py index 44c762a..20ba1d6 100644 --- a/src/anchor_pkg/launch/rover.launch.py +++ b/src/anchor_pkg/launch/rover.launch.py @@ -61,6 +61,7 @@ def launch_setup(context, *args, **kwargs): executable="ptz", # change as needed name="ptz", output="both", + condition=IfCondition(LaunchConfiguration("use_ptz", default="true")), # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure ) diff --git a/src/arm_pkg/arm_pkg/arm_headless.py b/src/arm_pkg/arm_pkg/arm_headless.py deleted file mode 100755 index 256c86b..0000000 --- a/src/arm_pkg/arm_pkg/arm_headless.py +++ /dev/null @@ -1,279 +0,0 @@ -import rclpy -from rclpy.node import Node - -import pygame - -import time - -import serial -import sys -import threading -import glob -import os - -from std_msgs.msg import String -from astra_msgs.msg import ControllerState -from astra_msgs.msg import ArmManual -from astra_msgs.msg import ArmIK - - -os.environ["SDL_AUDIODRIVER"] = ( - "dummy" # Force pygame to use a dummy audio driver before pygame.init() -) -os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display - - -class Headless(Node): - def __init__(self): - # Initalize node with name - super().__init__("arm_headless") - - # Depricated, kept temporarily for reference - # self.create_timer(0.20, self.send_controls)#read and send controls - - self.create_timer(0.1, self.send_manual) - - # Create a publisher to publish any output the pico sends - - # Depricated, kept temporarily for reference - # self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) - self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10) - - # Create a subscriber to listen to any commands sent for the pico - - # Depricated, kept temporarily for reference - # self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) - - # self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) - - self.laser_status = 0 - - # Initialize pygame - pygame.init() - - # Initialize the gamepad module - pygame.joystick.init() - - # Check if any gamepad is connected - if pygame.joystick.get_count() == 0: - print("No gamepad found.") - pygame.quit() - exit() - for event in pygame.event.get(): - if event.type == pygame.QUIT: - pygame.quit() - exit() - - # Initialize the first gamepad, print name to terminal - self.gamepad = pygame.joystick.Joystick(0) - self.gamepad.init() - print(f"Gamepad Found: {self.gamepad.get_name()}") - # - # - - def run(self): - # This thread makes all the update processes run in the background - thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) - thread.start() - - try: - while rclpy.ok(): - # Check the pico for updates - - # self.read_feedback() - if ( - pygame.joystick.get_count() == 0 - ): # if controller disconnected, wait for it to be reconnected - print(f"Gamepad disconnected: {self.gamepad.get_name()}") - - while pygame.joystick.get_count() == 0: - # self.send_controls() #depricated, kept for reference temporarily - self.send_manual() - # self.read_feedback() - self.gamepad = pygame.joystick.Joystick(0) - self.gamepad.init() # re-initialized gamepad - print(f"Gamepad reconnected: {self.gamepad.get_name()}") - - except KeyboardInterrupt: - sys.exit(0) - - def send_manual(self): - for event in pygame.event.get(): - if event.type == pygame.QUIT: - pygame.quit() - exit() - input = ArmManual() - - # Triggers for gripper control - if self.gamepad.get_axis(2) > 0: # left trigger - input.gripper = -1 - elif self.gamepad.get_axis(5) > 0: # right trigger - input.gripper = 1 - - # Toggle Laser - if self.gamepad.get_button(7): # Start - self.laser_status = 1 - elif self.gamepad.get_button(6): # Back - self.laser_status = 0 - input.laser = self.laser_status - - if self.gamepad.get_button(5): # right bumper, control effector - - # Left stick X-axis for effector yaw - if self.gamepad.get_axis(0) > 0: - input.effector_yaw = 1 - elif self.gamepad.get_axis(0) < 0: - input.effector_yaw = -1 - - # Right stick X-axis for effector roll - if self.gamepad.get_axis(3) > 0: - input.effector_roll = 1 - elif self.gamepad.get_axis(3) < 0: - input.effector_roll = -1 - - else: # Control arm axis - dpad_input = self.gamepad.get_hat(0) - input.axis0 = 0 - if dpad_input[0] == 1: - input.axis0 = 1 - elif dpad_input[0] == -1: - input.axis0 = -1 - - if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15: - input.axis1 = round(self.gamepad.get_axis(0)) - - if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15: - input.axis2 = -1 * round(self.gamepad.get_axis(1)) - - if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15: - input.axis3 = -1 * round(self.gamepad.get_axis(4)) - - # input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis - # input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis - # input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis - - # Button Mappings - # axis2 -> LT - # axis5 -> RT - # Buttons0 -> A - # Buttons1 -> B - # Buttons2 -> X - # Buttons3 -> Y - # Buttons4 -> LB - # Buttons5 -> RB - # Buttons6 -> Back - # Buttons7 -> Start - - input.linear_actuator = 0 - - if pygame.joystick.get_count() != 0: - - self.get_logger().info( - f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n" - ) - self.manual_pub.publish(input) - else: - pass - - pass - - # Depricated, kept temporarily for reference - # def send_controls(self): - - # for event in pygame.event.get(): - # if event.type == pygame.QUIT: - # pygame.quit() - # exit() - # input = ControllerState() - - # input.lt = self.gamepad.get_axis(2)#left trigger - # input.rt = self.gamepad.get_axis(5)#right trigger - - # #input.lb = self.gamepad.get_button(9)#Value must be converted to bool - # if(self.gamepad.get_button(4)):#left bumper - # input.lb = True - # else: - # input.lb = False - - # #input.rb = self.gamepad.get_button(10)#Value must be converted to bool - # if(self.gamepad.get_button(5)):#right bumper - # input.rb = True - # else: - # input.rb = False - - # #input.plus = self.gamepad.get_button(6)#plus button - # if(self.gamepad.get_button(7)):#plus button - # input.plus = True - # else: - # input.plus = False - - # #input.minus = self.gamepad.get_button(4)#minus button - # if(self.gamepad.get_button(6)):#minus button - # input.minus = True - # else: - # input.minus = False - - # input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis - # input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis - # input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis - # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis - - # #input.a = self.gamepad.get_button(1)#A button - # if(self.gamepad.get_button(0)):#A button - # input.a = True - # else: - # input.a = False - # #input.b = self.gamepad.get_button(0)#B button - # if(self.gamepad.get_button(1)):#B button - # input.b = True - # else: - # input.b = False - # #input.x = self.gamepad.get_button(3)#X button - # if(self.gamepad.get_button(2)):#X button - # input.x = True - # else: - # input.x = False - # #input.y = self.gamepad.get_button(2)#Y button - # if(self.gamepad.get_button(3)):#Y button - # input.y = True - # else: - # input.y = False - - # dpad_input = self.gamepad.get_hat(0)#D-pad input - - # #not using up/down on DPad - # input.d_up = False - # input.d_down = False - - # if(dpad_input[0] == 1):#D-pad right - # input.d_right = True - # else: - # input.d_right = False - # if(dpad_input[0] == -1):#D-pad left - # input.d_left = True - # else: - # input.d_left = False - - # if pygame.joystick.get_count() != 0: - - # self.get_logger().info(f"[Ctrl] Updated Controller State\n") - - # self.publisher.publish(input) - # else: - # pass - - -def main(args=None): - rclpy.init(args=args) - - node = Headless() - - rclpy.spin(node) - rclpy.shutdown() - - # tb_bs = BaseStation() - # node.run() - - -if __name__ == "__main__": - main() diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index 9387f73..d5e188b 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -1,181 +1,221 @@ -import rclpy -from rclpy.node import Node -import serial import sys import threading -import glob -import time -import atexit import signal -from std_msgs.msg import String -from astra_msgs.msg import ArmManual -from astra_msgs.msg import SocketFeedback -from astra_msgs.msg import DigitFeedback -from sensor_msgs.msg import JointState import math +from warnings import deprecated -# control_qos = qos.QoSProfile( -# history=qos.QoSHistoryPolicy.KEEP_LAST, -# depth=1, -# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, -# durability=qos.QoSDurabilityPolicy.VOLATILE, -# deadline=1000, -# lifespan=500, -# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, -# liveliness_lease_duration=5000 -# ) +import rclpy +from rclpy.node import Node +from rclpy.executors import ExternalShutdownException +from rclpy import qos + +from std_msgs.msg import String, Header +from sensor_msgs.msg import JointState +from control_msgs.msg import JointJog +from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics +from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState + +control_qos = qos.QoSProfile( + history=qos.QoSHistoryPolicy.KEEP_LAST, + depth=2, + reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers + durability=qos.QoSDurabilityPolicy.VOLATILE, + # deadline=Duration(seconds=1), + # lifespan=Duration(nanoseconds=500_000_000), # 500ms + # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, + # liveliness_lease_duration=Duration(seconds=5), +) -serial_pub = None thread = None -class SerialRelay(Node): +class ArmNode(Node): + """Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics.""" + + # Every non-fixed joint defined in Arm's URDF + # Used for JointState and JointJog messsages + all_joint_names = [ + "axis_0_joint", + "axis_1_joint", + "axis_2_joint", + "axis_3_joint", + "wrist_yaw_joint", + "wrist_roll_joint", + "ef_gripper_left_joint", + ] + + # Used to verify the length of an incoming VicCAN feedback message + # Key is VicCAN command_id, value is expected length of data list + viccan_socket_msg_len_dict = { + 53: 4, + 54: 4, + 55: 4, + 58: 4, + 59: 4, + } + + viccan_digit_msg_len_dict = { + 54: 4, + 55: 2, + 59: 2, + } + def __init__(self): - # Initialize node super().__init__("arm_node") - # Get launch mode parameter - self.declare_parameter("launch_mode", "arm") - self.launch_mode = self.get_parameter("launch_mode").value - self.get_logger().info(f"arm launch_mode is: {self.launch_mode}") + self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output - # Create publishers - self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) - self.socket_pub = self.create_publisher( - SocketFeedback, "/arm/feedback/socket", 10 - ) - self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) - self.feedback_timer = self.create_timer(0.25, self.publish_feedback) + ################################################## + # Parameters - # Create subscribers - self.man_sub = self.create_subscription( - ArmManual, "/arm/control/manual", self.send_manual, 2 + self.declare_parameter("use_old_topics", True) + self.use_old_topics = ( + self.get_parameter("use_old_topics").get_parameter_value().bool_value ) - # New messages - self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10) - self.joint_state = JointState() - self.joint_state.name = [ - "Axis_0_Joint", - "Axis_1_Joint", - "Axis_2_Joint", - "Axis_3_Joint", - "Wrist_Differential_Joint", - "Wrist-EF_Roll_Joint", - "Gripper_Slider_Left", - ] - self.joint_state.position = [0.0] * len( - self.joint_state.name - ) # Initialize with zeros + ################################################## + # Old topics - self.joint_command_sub = self.create_subscription( - JointState, "/joint_commands", self.joint_command_callback, 10 - ) - - # Topics used in anchor mode - if self.launch_mode == "anchor": + if self.use_old_topics: + # Anchor topics self.anchor_sub = self.create_subscription( String, "/anchor/arm/feedback", self.anchor_feedback, 10 ) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) - self.arm_feedback = SocketFeedback() - self.digit_feedback = DigitFeedback() + # Create publishers + self.socket_pub = self.create_publisher( + SocketFeedback, "/arm/feedback/socket", 10 + ) + self.arm_feedback = SocketFeedback() + self.digit_pub = self.create_publisher( + DigitFeedback, "/arm/feedback/digit", 10 + ) + self.digit_feedback = DigitFeedback() + self.feedback_timer = self.create_timer(0.25, self.publish_feedback) - # Search for ports IF in 'arm' (standalone) and not 'anchor' mode - if self.launch_mode == "arm": - # Loop through all serial devices on the computer to check for the MCU - self.port = None - ports = SerialRelay.list_serial_ports() - for _ in range(4): - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - # print(f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until("\n") # type: ignore + # Create subscribers + self.man_sub = self.create_subscription( + ArmManual, "/arm/control/manual", self.send_manual, 10 + ) - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.port = port - self.get_logger().info(f"Found MCU at {self.port}!") - break - except: - pass - if self.port is not None: - break + ################################################### + # New topics - if self.port is None: - self.get_logger().info( - "Unable to find MCU... please make sure it is connected." - ) - time.sleep(1) - sys.exit(1) + # Anchor topics - self.ser = serial.Serial(self.port, 115200) - atexit.register(self.cleanup) + # from_vic + self.anchor_fromvic_sub_ = self.create_subscription( + VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20 + ) + # to_vic + self.anchor_tovic_pub_ = self.create_publisher( + VicCAN, "/anchor/to_vic/relay", 20 + ) - def run(self): - global thread - thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) - thread.start() + # Control - # if in arm mode, will need to read from the MCU + # Manual: /arm/manual/joint_jog is published by Basestation or Headless + self.man_jointjog_sub_ = self.create_subscription( + JointJog, + "/arm/manual/joint_jog", + self.jointjog_callback, + qos_profile=control_qos, + ) + # IK: /joint_commands is published by JointTrajectoryController via topic_based_control + self.joint_command_sub_ = self.create_subscription( + JointState, + "/joint_commands", + self.joint_command_callback, + qos_profile=control_qos, + ) - try: - while rclpy.ok(): - if self.launch_mode == "arm": - if self.ser.in_waiting: - self.read_mcu() - else: - time.sleep(0.1) - except KeyboardInterrupt: - pass - finally: - self.cleanup() + # Feedback - # Currently will just spit out all values over the /arm/feedback/debug topic as strings - def read_mcu(self): - try: - output = str(self.ser.readline(), "utf8") - if output: - # self.get_logger().info(f"[MCU] {output}") - msg = String() - msg.data = output - self.debug_pub.publish(msg) - except serial.SerialException: - self.get_logger().info("SerialException caught... closing serial port.") - if self.ser.is_open: - self.ser.close() - pass - except TypeError as e: - self.get_logger().info(f"TypeError: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - pass - except Exception as e: - print(f"Exception: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - pass + # Combined Socket and Digit feedback + self.arm_feedback_pub_ = self.create_publisher( + ArmFeedback, + "/arm/feedback", + qos_profile=qos.qos_profile_sensor_data, + ) + # IK arm pose: /joint_states is published from here to topic_based_control + self.joint_state_pub_ = self.create_publisher( + JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data + ) + + ################################################### + # Saved state + + # Combined Socket and Digit feedback + self.arm_feedback_new = ArmFeedback() + + # IK Arm pose + self.saved_joint_state = JointState() + self.saved_joint_state.name = self.all_joint_names + # ... initialize with zeros + self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name) + self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name) + + def jointjog_callback(self, msg: JointJog): + if len(msg.joint_names) != len(msg.velocities): + self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.") + return + + # Grab velocities from message + velocities = [ + ( + msg.velocities[msg.joint_names.index(joint_name)] # type: ignore + if joint_name in msg.joint_names + else 0.0 + ) + for joint_name in self.all_joint_names + ] + # Deadzone + velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities] + + self.send_velocities(velocities, msg.header) + + # TODO: use msg.duration def joint_command_callback(self, msg: JointState): - # Embedded takes deg*10, ROS2 uses Radians - positions = [math.degrees(pos) * 10 for pos in msg.position] - # Axis 2 & 3 URDF direction is inverted - positions[2] = -positions[2] - positions[3] = -positions[3] + if len(msg.position) < 7 and len(msg.velocity) < 7: + self.get_logger().debug("Ignoring malformed /joint_command message.") + return # command needs either position or velocity for all 7 joints - # Set target angles for each arm axis for embedded IK PID to handle - command = f"can_relay_tovic,arm,32,{positions[0]},{positions[1]},{positions[2]},{positions[3]}\n" - # Wrist yaw and roll - command += f"can_relay_tovic,digit,32,{positions[4]},{positions[5]}\n" - # Gripper IK does not have adequate hardware yet - self.send_cmd(command) + # Grab velocities from message + velocities = [ + ( + msg.velocity[msg.name.index(joint_name)] # type: ignore + if joint_name in msg.name + else 0.0 + ) + for joint_name in self.all_joint_names + ] + self.send_velocities(velocities, msg.header) + + def send_velocities(self, velocities: list[float], header: Header): + # ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s + velocities = [ + math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities) + ] + + # Send Axis 0-3 + self.anchor_tovic_pub_.publish( + VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header) + ) + # Send Wrist yaw and roll + # TODO: Verify embedded + self.anchor_tovic_pub_.publish( + VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header) + ) + # Send End Effector Gripper + # TODO: Verify m/s received correctly by embedded + self.anchor_tovic_pub_.publish( + VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header) + ) + + @deprecated("Uses an old message type. Will be removed at some point.") def send_manual(self, msg: ArmManual): axis0 = msg.axis0 axis1 = -1 * msg.axis1 @@ -200,24 +240,17 @@ class SerialRelay(Node): return + @deprecated("Uses an old message type. Will be removed at some point.") def send_cmd(self, msg: str): - if ( - self.launch_mode == "anchor" - ): # if in anchor mode, send to anchor node to relay - output = String() - output.data = msg - self.anchor_pub.publish(output) - elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly - self.get_logger().info(f"[Arm to MCU] {msg}") - self.ser.write(bytes(msg, "utf8")) + output = String(data=msg) + self.anchor_pub.publish(output) + @deprecated("Uses an old message type. Will be removed at some point.") def anchor_feedback(self, msg: String): output = msg.data if output.startswith("can_relay_fromvic,arm,55"): - # pass self.updateAngleFeedback(output) elif output.startswith("can_relay_fromvic,arm,54"): - # pass self.updateBusVoltage(output) elif output.startswith("can_relay_fromvic,arm,53"): self.updateMotorFeedback(output) @@ -235,19 +268,133 @@ class SerialRelay(Node): if len(parts) >= 4: self.digit_feedback.wrist_angle = float(parts[3]) # self.digit_feedback.wrist_roll = float(parts[4]) - self.joint_state.position[4] = math.radians( - float(parts[4]) - ) # Wrist roll - self.joint_state.position[5] = math.radians( - float(parts[3]) - ) # Wrist yaw else: return + def relay_fromvic(self, msg: VicCAN): + # Code for socket and digit are broken out for cleaner code + if msg.mcu_name == "arm": + self.process_fromvic_arm(msg) + elif msg.mcu_name == "digit": + self.process_fromvic_digit(msg) + + def process_fromvic_arm(self, msg: VicCAN): + assert msg.mcu_name == "arm" + + # Check message len to prevent crashing on bad data + if msg.command_id in self.viccan_socket_msg_len_dict: + expected_len = self.viccan_socket_msg_len_dict[msg.command_id] + if len(msg.data) != expected_len: + self.get_logger().warning( + f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" + ) + return + + match msg.command_id: + case 53: # REV SPARK MAX feedback + motorId = round(msg.data[0]) + motor: RevMotorState | None = None + match motorId: + case 1: + motor = self.arm_feedback_new.axis1_motor + case 2: + motor = self.arm_feedback_new.axis2_motor + case 3: + motor = self.arm_feedback_new.axis3_motor + case 4: + motor = self.arm_feedback_new.axis0_motor + + if motor: + motor.temperature = float(msg.data[1]) / 10.0 + motor.voltage = float(msg.data[2]) / 10.0 + motor.current = float(msg.data[3]) / 10.0 + motor.header.stamp = msg.header.stamp + + self.arm_feedback_pub_.publish(self.arm_feedback_new) + case 54: # Board voltages + self.arm_feedback_new.socket_voltage.vbatt = float(msg.data[0]) / 100.0 + self.arm_feedback_new.socket_voltage.v12 = float(msg.data[1]) / 100.0 + self.arm_feedback_new.socket_voltage.v5 = float(msg.data[2]) / 100.0 + self.arm_feedback_new.socket_voltage.v3 = float(msg.data[3]) / 100.0 + self.arm_feedback_new.socket_voltage.header.stamp = msg.header.stamp + case 55: # Arm joint positions + angles = [angle / 10.0 for angle in msg.data] # VicCAN sends deg*10 + # Joint state publisher for URDF visualization + self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0 + self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1 + self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2 + self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3 + # Wrist is handled by digit feedback + self.saved_joint_state.header.stamp = msg.header.stamp + self.joint_state_pub_.publish(self.saved_joint_state) + case 58: # REV SPARK MAX position and velocity feedback + motorId = round(msg.data[0]) + motor: RevMotorState | None = None + match motorId: + case 1: + motor = self.arm_feedback_new.axis1_motor + case 2: + motor = self.arm_feedback_new.axis2_motor + case 3: + motor = self.arm_feedback_new.axis3_motor + case 4: + motor = self.arm_feedback_new.axis0_motor + + if motor: + motor.position = float(msg.data[1]) + motor.velocity = float(msg.data[2]) + motor.header.stamp = msg.header.stamp + + self.arm_feedback_pub_.publish(self.arm_feedback_new) + case 59: # Arm joint velocities + velocities = [vel / 100.0 for vel in msg.data] # VicCAN sends deg/s*100 + self.saved_joint_state.velocity[0] = math.radians( + velocities[0] + ) # Axis 0 + self.saved_joint_state.velocity[1] = math.radians( + velocities[1] + ) # Axis 1 + self.saved_joint_state.velocity[2] = math.radians( + velocities[2] + ) # Axis 2 + self.saved_joint_state.velocity[3] = math.radians( + velocities[3] + ) # Axis 3 + # Wrist is handled by digit feedback + self.saved_joint_state.header.stamp = msg.header.stamp + self.joint_state_pub_.publish(self.saved_joint_state) + + def process_fromvic_digit(self, msg: VicCAN): + assert msg.mcu_name == "digit" + + # Check message len to prevent crashing on bad data + if msg.command_id in self.viccan_digit_msg_len_dict: + expected_len = self.viccan_digit_msg_len_dict[msg.command_id] + if len(msg.data) != expected_len: + self.get_logger().warning( + f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" + ) + return + + match msg.command_id: + case 54: # Board voltages + self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0 + self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0 + self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0 + case 55: # Arm joint positions + self.saved_joint_state.position[4] = math.radians( + msg.data[0] + ) # Wrist roll + self.saved_joint_state.position[5] = math.radians( + msg.data[1] + ) # Wrist yaw + + @deprecated("Uses an old message type. Will be removed at some point.") def publish_feedback(self): self.socket_pub.publish(self.arm_feedback) self.digit_pub.publish(self.digit_feedback) + @deprecated("Uses an old message type. Will be removed at some point.") def updateAngleFeedback(self, msg: str): # Angle feedbacks, # split the msg.data by commas @@ -263,19 +410,10 @@ class SerialRelay(Node): self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis3_angle = angles[3] - - # Joint state publisher for URDF visualization - self.joint_state.position[0] = math.radians(angles[0]) # Axis 0 - self.joint_state.position[1] = math.radians(angles[1]) # Axis 1 - self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted) - self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted) - # Wrist is handled by digit feedback - self.joint_state.header.stamp = self.get_clock().now().to_msg() - self.joint_state_pub.publish(self.joint_state) - else: self.get_logger().info("Invalid angle feedback input format") + @deprecated("Uses an old message type. Will be removed at some point.") def updateBusVoltage(self, msg: str): # Bus Voltage feedbacks parts = msg.split(",") @@ -290,6 +428,7 @@ class SerialRelay(Node): else: self.get_logger().info("Invalid voltage feedback input format") + @deprecated("Uses an old message type. Will be removed at some point.") def updateMotorFeedback(self, msg: str): parts = str(msg.strip()).split(",") motorId = round(float(parts[3])) @@ -313,38 +452,28 @@ class SerialRelay(Node): self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_current = current - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - # return glob.glob("/dev/tty[A-Za-z]*") - def cleanup(self): - print("Cleaning up...") - try: - if self.ser.is_open: - self.ser.close() - except Exception as e: - exit(0) - - -def myexcepthook(type, value, tb): - print("Uncaught exception:", type, value) - if serial_pub: - serial_pub.cleanup() +def exit_handler(signum, frame): + print("Caught SIGTERM. Exiting...") + rclpy.try_shutdown() + sys.exit(0) def main(args=None): rclpy.init(args=args) - sys.excepthook = myexcepthook - global serial_pub - serial_pub = SerialRelay() - serial_pub.run() + # Catch termination signals and exit cleanly + signal.signal(signal.SIGTERM, exit_handler) + + arm_node = ArmNode() + + try: + rclpy.spin(arm_node) + except (KeyboardInterrupt, ExternalShutdownException): + pass + finally: + rclpy.try_shutdown() if __name__ == "__main__": - # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal( - signal.SIGTERM, lambda signum, frame: sys.exit(0) - ) # Catch termination signals and exit cleanly main() diff --git a/src/arm_pkg/setup.py b/src/arm_pkg/setup.py index f36201b..f031e9d 100644 --- a/src/arm_pkg/setup.py +++ b/src/arm_pkg/setup.py @@ -1,27 +1,22 @@ -from setuptools import find_packages, setup -import os -from glob import glob +from setuptools import setup package_name = "arm_pkg" setup( name=package_name, version="1.0.0", - packages=find_packages(exclude=["test"]), + packages=[package_name], data_files=[ ("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/" + package_name, ["package.xml"]), ], install_requires=["setuptools"], zip_safe=True, - maintainer="tristan", - maintainer_email="tristanmcginnis26@gmail.com", - description="TODO: Package description", - license="All Rights Reserved", + maintainer="David", + maintainer_email="ds0196@uah.edu", + description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.", + license="AGPL-3.0-only", entry_points={ - "console_scripts": [ - "arm = arm_pkg.arm_node:main", - "headless = arm_pkg.arm_headless:main", - ], + "console_scripts": ["arm = arm_pkg.arm_node:main"], }, ) diff --git a/src/astra_descriptions b/src/astra_descriptions index 35bf223..04c2061 160000 --- a/src/astra_descriptions +++ b/src/astra_descriptions @@ -1 +1 @@ -Subproject commit 35bf223ae9b5ddd07a39a77e0c852771f4aa8c8b +Subproject commit 04c2061443405a07cef1d7fcb64d485b3d6e9c5a diff --git a/src/astra_msgs b/src/astra_msgs index 2840bfe..2264a2c 160000 --- a/src/astra_msgs +++ b/src/astra_msgs @@ -1 +1 @@ -Subproject commit 2840bfef34ba40f6c7c494891fde3478ff184de3 +Subproject commit 2264a2cb673c88810eecea2ab462b5a7366947a1 diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 62214a0..ae0f041 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -1,651 +1,651 @@ -import rclpy -from rclpy.node import Node -from rclpy import qos -from rclpy.duration import Duration -from std_srvs.srv import Empty - -import signal -import time -import atexit - -import serial -import os -import sys -import threading -import glob -from scipy.spatial.transform import Rotation -from math import copysign, pi - -from std_msgs.msg import String, Header -from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState -from geometry_msgs.msg import TwistStamped, Twist -from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState -from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState - - -serial_pub = None -thread = None - -CORE_WHEELBASE = 0.836 # meters -CORE_WHEEL_RADIUS = 0.171 # meters -CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 - -control_qos = qos.QoSProfile( - # history=qos.QoSHistoryPolicy.KEEP_LAST, - depth=2, - # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, - # durability=qos.QoSDurabilityPolicy.VOLATILE, - # deadline=Duration(seconds=1), - # lifespan=Duration(nanoseconds=500_000_000), # 500ms - # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, - # liveliness_lease_duration=Duration(seconds=5), -) - -# Used to verify the length of an incoming VicCAN feedback message -# Key is VicCAN command_id, value is expected length of data list -viccan_msg_len_dict = { - 48: 1, - 49: 1, - 50: 2, - 51: 4, - 52: 4, - 53: 4, - 54: 4, - 56: 4, # really 3, but viccan - 58: 4, # ditto -} - - -class SerialRelay(Node): - def __init__(self): - # Initalize node with name - super().__init__("core_node") - - # Launch mode -- anchor vs core - self.declare_parameter("launch_mode", "core") - self.launch_mode = self.get_parameter("launch_mode").value - self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") - - self.declare_parameter("use_ros2_control", False) - self.use_ros2_control = self.get_parameter("use_ros2_control").value - self.get_logger().info(f"Use ros2_control: {self.use_ros2_control}") - - ################################################## - # Topics - - # Anchor - if self.launch_mode == "anchor": - self.anchor_fromvic_sub_ = self.create_subscription( - VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20 - ) - self.anchor_tovic_pub_ = self.create_publisher( - VicCAN, "/anchor/to_vic/relay", 20 - ) - - self.anchor_sub = self.create_subscription( - String, "/anchor/core/feedback", self.anchor_feedback, 10 - ) - self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) - - # Control - - if self.use_ros2_control: - # Joint state control for topic-based controller - self.joint_command_sub_ = self.create_subscription( - JointState, "/core/joint_commands", self.joint_command_callback, 2 - ) - else: - # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 - self.cmd_vel_sub_ = self.create_subscription( - TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1 - ) - # manual twist -- [-1, 1] rather than real units - self.twist_man_sub_ = self.create_subscription( - Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos - ) - # manual flags -- brake mode and max duty cycle - self.control_state_sub_ = self.create_subscription( - CoreCtrlState, - "/core/control/state", - self.control_state_callback, - qos_profile=control_qos, - ) - self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 - - # Feedback - - # Consolidated and organized core feedback - self.feedback_new_pub_ = self.create_publisher( - NewCoreFeedback, - "/core/feedback_new", - qos_profile=qos.qos_profile_sensor_data, - ) - self.feedback_new_state = NewCoreFeedback() - self.feedback_new_state.fl_motor.id = 1 - self.feedback_new_state.bl_motor.id = 2 - self.feedback_new_state.fr_motor.id = 3 - self.feedback_new_state.br_motor.id = 4 - self.telemetry_pub_timer = self.create_timer( - 1.0, self.publish_feedback - ) # TODO: not sure about this - # Joint states for topic-based controller - self.joint_state_pub_ = self.create_publisher( - JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data - ) - # IMU (embedded BNO-055) - self.imu_pub_ = self.create_publisher( - Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data - ) - self.imu_state = Imu() - self.imu_state.header.frame_id = "core_bno055" - # GPS (embedded u-blox M9N) - self.gps_pub_ = self.create_publisher( - NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data - ) - self.gps_state = NavSatFix() - self.gps_state.header.frame_id = "core_gps_antenna" - self.gps_state.status.service = NavSatStatus.SERVICE_GPS - self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX - self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN - # Barometer (embedded BMP-388) - self.baro_pub_ = self.create_publisher( - Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data - ) - self.baro_state = Barometer() - self.baro_state.header.frame_id = "core_bmp388" - - # Old - - if not self.use_ros2_control: - # /core/control - self.control_sub = self.create_subscription( - CoreControl, "/core/control", self.send_controls, 10 - ) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff - # /core/feedback - self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10) - self.core_feedback = CoreFeedback() - # Debug - self.debug_pub = self.create_publisher(String, "/core/debug", 10) - self.ping_service = self.create_service( - Empty, "/astra/core/ping", self.ping_callback - ) - - ################################################## - # Find microcontroller (Non-anchor only) - - # Core (non-anchor) specific - if self.launch_mode == "core": - # Loop through all serial devices on the computer to check for the MCU - self.port = None - ports = SerialRelay.list_serial_ports() - for i in range(2): - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - # (f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until("\n") # type: ignore - - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.port = port - self.get_logger().info(f"Found MCU at {self.port}!") - self.get_logger().info(f"Enabling Relay Mode") - ser.write(b"can_relay_mode,on\n") - break - except: - pass - if self.port is not None: - break - - if self.port is None: - self.get_logger().info("Unable to find MCU...") - time.sleep(1) - sys.exit(1) - - self.ser = serial.Serial(self.port, 115200) - atexit.register(self.cleanup) - # end __init__() - - def run(self): - # This thread makes all the update processes run in the background - global thread - thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) - thread.start() - - try: - while rclpy.ok(): - if self.launch_mode == "core": - self.read_MCU() # Check the MCU for updates - except KeyboardInterrupt: - sys.exit(0) - - def read_MCU(self): # NON-ANCHOR SPECIFIC - try: - output = str(self.ser.readline(), "utf8") - - if output: - # All output over debug temporarily - print(f"[MCU] {output}") - msg = String() - msg.data = output - self.debug_pub.publish(msg) - return - except serial.SerialException as e: - print(f"SerialException: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - sys.exit(1) - except TypeError as e: - print(f"TypeError: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - sys.exit(1) - except Exception as e: - print(f"Exception: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - sys.exit(1) - - def scale_duty(self, value: float, max_speed: float): - leftMin = -1 - leftMax = 1 - rightMin = -max_speed / 100.0 - rightMax = max_speed / 100.0 - - # Figure out how 'wide' each range is - leftSpan = leftMax - leftMin - rightSpan = rightMax - rightMin - - # Convert the left range into a 0-1 range (float) - valueScaled = float(value - leftMin) / float(leftSpan) - - # Convert the 0-1 range into a value in the right range. - return str(rightMin + (valueScaled * rightSpan)) - - def send_controls(self, msg: CoreControl): - if msg.turn_to_enable: - command = ( - "can_relay_tovic,core,41," - + str(msg.turn_to) - + "," - + str(msg.turn_to_timeout) - + "\n" - ) - else: - command = ( - "can_relay_tovic,core,19," - + self.scale_duty(msg.left_stick, msg.max_speed) - + "," - + self.scale_duty(msg.right_stick, msg.max_speed) - + "\n" - ) - self.send_cmd(command) - - # Brake mode - command = "can_relay_tovic,core,18," + str(int(msg.brake)) + "\n" - self.send_cmd(command) - - # print(f"[Sys] Relaying: {command}") - - def joint_command_callback(self, msg: JointState): - # So... topic based control node publishes JointState messages over /joint_commands - # with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints - # are included in msg.name, but ig it is implied that msg.velocity only - # includes velocities for the commanded joints (ros__parameters.joints). - # So, this will be much more hacky and less adaptable than I would like it to be. - if len(msg.name) != 5 or len(msg.velocity) != 4 or len(msg.position) != 0: - self.get_logger().warning( - f"Received joint control message with unexpected number of joints. Ignoring." - ) - return - if msg.name != [ - "left_suspension_joint", - "bl_wheel_joint", - "br_wheel_joint", - "fl_wheel_joint", - "fr_wheel_joint", - ]: - self.get_logger().warning( - f"Received joint control message with unexpected name[]. Ignoring." - ) - return - - (bl_vel, br_vel, fl_vel, fr_vel) = msg.velocity - - bl_rpm = radps_to_rpm(bl_vel) * CORE_GEAR_RATIO - br_rpm = radps_to_rpm(br_vel) * CORE_GEAR_RATIO - fl_rpm = radps_to_rpm(fl_vel) * CORE_GEAR_RATIO - fr_rpm = radps_to_rpm(fr_vel) * CORE_GEAR_RATIO - - self.send_viccan( - 20, [fl_rpm, bl_rpm, fr_rpm, br_rpm] - ) # order expected by embedded - - def cmd_vel_callback(self, msg: TwistStamped): - linear = msg.twist.linear.x - angular = -msg.twist.angular.z - - vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS - vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS - - vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO - vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO - - self.send_viccan(20, [vel_left_rpm, vel_right_rpm]) - - def twist_man_callback(self, msg: Twist): - linear = msg.linear.x # [-1 1] for forward/back from left stick y - angular = msg.angular.z # [-1 1] for left/right from right stick x - - if linear < 0: # reverse turning direction when going backwards (WIP) - angular *= -1 - - if abs(linear) > 1 or abs(angular) > 1: - # if speed is greater than 1, then there is a problem - # make it look like a problem and don't just run away lmao - linear = copysign( - 0.25, linear - ) # 0.25 duty cycle in direction of control (hopefully slow) - angular = copysign(0.25, angular) - - duty_left = linear - angular - duty_right = linear + angular - scale = max(1, abs(duty_left), abs(duty_right)) - duty_left /= scale - duty_right /= scale - - # Apply max duty cycle - # Joysticks provide values [-1, 1] rather than real units - duty_left = map_range( - duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty - ) - duty_right = map_range( - duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty - ) - - self.send_viccan(19, [duty_left, duty_right]) - - def control_state_callback(self, msg: CoreCtrlState): - # Brake mode - self.send_viccan(18, [msg.brake_mode]) - - # Max duty cycle - self.twist_max_duty = msg.max_duty # twist_man_callback will handle this - - def send_cmd(self, msg: str): - if self.launch_mode == "anchor": - # self.get_logger().info(f"[Core to Anchor Relay] {msg}") - output = String() # Convert to std_msg string - output.data = msg - self.anchor_pub.publish(output) - elif self.launch_mode == "core": - self.get_logger().info(f"[Core to MCU] {msg}") - self.ser.write(bytes(msg, "utf8")) - - def send_viccan(self, cmd_id: int, data: list[float]): - self.anchor_tovic_pub_.publish( - VicCAN( - header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"), - mcu_name="core", - command_id=cmd_id, - data=data, - ) - ) - - def anchor_feedback(self, msg: String): - output = msg.data - parts = str(output.strip()).split(",") - # GNSS Latitude - if output.startswith("can_relay_fromvic,core,48"): - self.core_feedback.gps_lat = float(parts[3]) - # GNSS Longitude - elif output.startswith("can_relay_fromvic,core,49"): - self.core_feedback.gps_long = float(parts[3]) - # GNSS Satellite count and altitude - elif output.startswith("can_relay_fromvic,core,50"): - self.core_feedback.gps_sats = round(float(parts[3])) - self.core_feedback.gps_alt = round(float(parts[4]), 2) - # Gyro x, y, z, and imu calibration - elif output.startswith("can_relay_fromvic,core,51"): - self.core_feedback.bno_gyro.x = float(parts[3]) - self.core_feedback.bno_gyro.y = float(parts[4]) - self.core_feedback.bno_gyro.z = float(parts[5]) - self.core_feedback.imu_calib = round(float(parts[6])) - # Accel x, y, z, heading - elif output.startswith("can_relay_fromvic,core,52"): - self.core_feedback.bno_accel.x = float(parts[3]) - self.core_feedback.bno_accel.y = float(parts[4]) - self.core_feedback.bno_accel.z = float(parts[5]) - self.core_feedback.orientation = float(parts[6]) - # REV Sparkmax feedback - elif output.startswith("can_relay_fromvic,core,53"): - motorId = round(float(parts[3])) - temp = float(parts[4]) / 10.0 - voltage = float(parts[5]) / 10.0 - current = float(parts[6]) / 10.0 - if motorId == 1: - self.core_feedback.fl_temp = temp - self.core_feedback.fl_voltage = voltage - self.core_feedback.fl_current = current - elif motorId == 2: - self.core_feedback.bl_temp = temp - self.core_feedback.bl_voltage = voltage - self.core_feedback.bl_current = current - elif motorId == 3: - self.core_feedback.fr_temp = temp - self.core_feedback.fr_voltage = voltage - self.core_feedback.fr_current = current - elif motorId == 4: - self.core_feedback.br_temp = temp - self.core_feedback.br_voltage = voltage - self.core_feedback.br_current = current - # Voltages batt, 12, 5, 3, all * 100 - elif output.startswith("can_relay_fromvic,core,54"): - self.core_feedback.bat_voltage = float(parts[3]) / 100.0 - self.core_feedback.voltage_12 = float(parts[4]) / 100.0 - self.core_feedback.voltage_5 = float(parts[5]) / 100.0 - self.core_feedback.voltage_3 = float(parts[6]) / 100.0 - # BMP temperature, altitude, pressure - elif output.startswith("can_relay_fromvic,core,56"): - self.core_feedback.bmp_temp = float(parts[3]) - self.core_feedback.bmp_alt = float(parts[4]) - self.core_feedback.bmp_pres = float(parts[5]) - else: - return - self.feedback_new_state.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_pub_.publish(self.feedback_new_state) - # self.get_logger().info(f"[Core Anchor] {msg}") - - def relay_fromvic(self, msg: VicCAN): - # Assume that the message is coming from Core - # skill diff if not - - # Check message len to prevent crashing on bad data - if msg.command_id in viccan_msg_len_dict: - expected_len = viccan_msg_len_dict[msg.command_id] - if len(msg.data) != expected_len: - self.get_logger().warning( - f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" - ) - return - - match msg.command_id: - # GNSS - case 48: # GNSS Latitude - self.gps_state.latitude = float(msg.data[0]) - case 49: # GNSS Longitude - self.gps_state.longitude = float(msg.data[0]) - case 50: # GNSS Satellite count and altitude - self.gps_state.status.status = ( - NavSatStatus.STATUS_FIX - if int(msg.data[0]) >= 3 - else NavSatStatus.STATUS_NO_FIX - ) - self.gps_state.altitude = float(msg.data[1]) - self.gps_state.header.stamp = msg.header.stamp - self.gps_pub_.publish(self.gps_state) - # IMU - case 51: # Gyro x, y, z, and imu calibration - self.feedback_new_state.imu_calib = round(float(msg.data[3])) - self.imu_state.angular_velocity.x = float(msg.data[0]) - self.imu_state.angular_velocity.y = float(msg.data[1]) - self.imu_state.angular_velocity.z = float(msg.data[2]) - self.imu_state.header.stamp = msg.header.stamp - case 52: # Accel x, y, z, heading - self.imu_state.linear_acceleration.x = float(msg.data[0]) - self.imu_state.linear_acceleration.y = float(msg.data[1]) - self.imu_state.linear_acceleration.z = float(msg.data[2]) - # Deal with quaternion - r = Rotation.from_euler("z", float(msg.data[3]), degrees=True) - q = r.as_quat() - self.imu_state.orientation.x = q[0] - self.imu_state.orientation.y = q[1] - self.imu_state.orientation.z = q[2] - self.imu_state.orientation.w = q[3] - self.imu_state.header.stamp = msg.header.stamp - self.imu_pub_.publish(self.imu_state) - # REV Motors - case 53: # REV SPARK MAX feedback - motorId = round(float(msg.data[0])) - temp = float(msg.data[1]) / 10.0 - voltage = float(msg.data[2]) / 10.0 - current = float(msg.data[3]) / 10.0 - motor: RevMotorState | None = None - match motorId: - case 1: - motor = self.feedback_new_state.fl_motor - case 2: - motor = self.feedback_new_state.bl_motor - case 3: - motor = self.feedback_new_state.fr_motor - case 4: - motor = self.feedback_new_state.br_motor - case _: - self.get_logger().warning( - f"Ignoring REV motor feedback 53 with invalid motorId {motorId}" - ) - return - - if motor: - motor.temperature = temp - motor.voltage = voltage - motor.current = current - motor.header.stamp = msg.header.stamp - - self.feedback_new_pub_.publish(self.feedback_new_state) - # Board voltage - case 54: # Voltages batt, 12, 5, 3, all * 100 - self.feedback_new_state.board_voltage.vbatt = float(msg.data[0]) / 100.0 - self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0 - self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0 - self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0 - # Baro - case 56: # BMP temperature, altitude, pressure - self.baro_state.temperature = float(msg.data[0]) - self.baro_state.altitude = float(msg.data[1]) - self.baro_state.pressure = float(msg.data[2]) - self.baro_state.header.stamp = msg.header.stamp - self.baro_pub_.publish(self.baro_state) - # REV Motors (pos and vel) - case 58: # REV position and velocity - motorId = round(float(msg.data[0])) - position = float(msg.data[1]) - velocity = float(msg.data[2]) - joint_state_msg = ( - JointState() - ) # TODO: not sure if all motors should be in each message or not - joint_state_msg.position = [ - position * (2 * pi) / CORE_GEAR_RATIO - ] # revolutions to radians - joint_state_msg.velocity = [ - velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO - ] # RPM to rad/s - - motor: RevMotorState | None = None - - match motorId: - case 1: - motor = self.feedback_new_state.fl_motor - joint_state_msg.name = ["fl_wheel_joint"] - case 2: - motor = self.feedback_new_state.bl_motor - joint_state_msg.name = ["bl_wheel_joint"] - case 3: - motor = self.feedback_new_state.fr_motor - joint_state_msg.name = ["fr_wheel_joint"] - case 4: - motor = self.feedback_new_state.br_motor - joint_state_msg.name = ["br_wheel_joint"] - case _: - self.get_logger().warning( - f"Ignoring REV motor feedback 58 with invalid motorId {motorId}" - ) - return - - # make the fucking shit work - joint_state_msg.name.append("left_suspension_joint") - joint_state_msg.position.append(0.0) - joint_state_msg.velocity.append(0.0) - - joint_state_msg.header.stamp = msg.header.stamp - self.joint_state_pub_.publish(joint_state_msg) - case _: - return - - def publish_feedback(self): - # self.get_logger().info(f"[Core] {self.core_feedback}") - self.feedback_pub.publish(self.core_feedback) - - def ping_callback(self, request, response): - return response - - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - - def cleanup(self): - print("Cleaning up before terminating...") - try: - if self.ser.is_open: - self.ser.close() - except Exception as e: - exit(0) - - -def myexcepthook(type, value, tb): - print("Uncaught exception:", type, value) - if serial_pub: - serial_pub.cleanup() - - -def map_range( - value: float, in_min: float, in_max: float, out_min: float, out_max: float -): - return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min - - -def radps_to_rpm(radps: float): - return radps * 60 / (2 * pi) - - -def main(args=None): - rclpy.init(args=args) - sys.excepthook = myexcepthook - - global serial_pub - - serial_pub = SerialRelay() - serial_pub.run() - - -if __name__ == "__main__": - # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal( - signal.SIGTERM, lambda signum, frame: sys.exit(0) - ) # Catch termination signals and exit cleanly - main() +import rclpy +from rclpy.node import Node +from rclpy import qos +from rclpy.duration import Duration +from std_srvs.srv import Empty + +import signal +import time +import atexit + +import serial +import os +import sys +import threading +import glob +from scipy.spatial.transform import Rotation +from math import copysign, pi + +from std_msgs.msg import String, Header +from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState +from geometry_msgs.msg import TwistStamped, Twist +from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState +from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState + + +serial_pub = None +thread = None + +CORE_WHEELBASE = 0.836 # meters +CORE_WHEEL_RADIUS = 0.171 # meters +CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 + +control_qos = qos.QoSProfile( + history=qos.QoSHistoryPolicy.KEEP_LAST, + depth=2, + reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers + durability=qos.QoSDurabilityPolicy.VOLATILE, + # deadline=Duration(seconds=1), + # lifespan=Duration(nanoseconds=500_000_000), # 500ms + # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, + # liveliness_lease_duration=Duration(seconds=5), +) + +# Used to verify the length of an incoming VicCAN feedback message +# Key is VicCAN command_id, value is expected length of data list +viccan_msg_len_dict = { + 48: 1, + 49: 1, + 50: 2, + 51: 4, + 52: 4, + 53: 4, + 54: 4, + 56: 4, # really 3, but viccan + 58: 4, # ditto +} + + +class SerialRelay(Node): + def __init__(self): + # Initalize node with name + super().__init__("core_node") + + # Launch mode -- anchor vs core + self.declare_parameter("launch_mode", "core") + self.launch_mode = self.get_parameter("launch_mode").value + self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") + + self.declare_parameter("use_ros2_control", False) + self.use_ros2_control = self.get_parameter("use_ros2_control").value + self.get_logger().info(f"Use ros2_control: {self.use_ros2_control}") + + ################################################## + # Topics + + # Anchor + if self.launch_mode == "anchor": + self.anchor_fromvic_sub_ = self.create_subscription( + VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20 + ) + self.anchor_tovic_pub_ = self.create_publisher( + VicCAN, "/anchor/to_vic/relay", 20 + ) + + self.anchor_sub = self.create_subscription( + String, "/anchor/core/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) + + # Control + + if self.use_ros2_control: + # Joint state control for topic-based controller + self.joint_command_sub_ = self.create_subscription( + JointState, "/core/joint_commands", self.joint_command_callback, 2 + ) + else: + # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 + self.cmd_vel_sub_ = self.create_subscription( + TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1 + ) + # manual twist -- [-1, 1] rather than real units + self.twist_man_sub_ = self.create_subscription( + Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos + ) + # manual flags -- brake mode and max duty cycle + self.control_state_sub_ = self.create_subscription( + CoreCtrlState, + "/core/control/state", + self.control_state_callback, + qos_profile=control_qos, + ) + self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 + + # Feedback + + # Consolidated and organized core feedback + self.feedback_new_pub_ = self.create_publisher( + NewCoreFeedback, + "/core/feedback_new", + qos_profile=qos.qos_profile_sensor_data, + ) + self.feedback_new_state = NewCoreFeedback() + self.feedback_new_state.fl_motor.id = 1 + self.feedback_new_state.bl_motor.id = 2 + self.feedback_new_state.fr_motor.id = 3 + self.feedback_new_state.br_motor.id = 4 + self.telemetry_pub_timer = self.create_timer( + 1.0, self.publish_feedback + ) # TODO: not sure about this + # Joint states for topic-based controller + self.joint_state_pub_ = self.create_publisher( + JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data + ) + # IMU (embedded BNO-055) + self.imu_pub_ = self.create_publisher( + Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data + ) + self.imu_state = Imu() + self.imu_state.header.frame_id = "core_bno055" + # GPS (embedded u-blox M9N) + self.gps_pub_ = self.create_publisher( + NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data + ) + self.gps_state = NavSatFix() + self.gps_state.header.frame_id = "core_gps_antenna" + self.gps_state.status.service = NavSatStatus.SERVICE_GPS + self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX + self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN + # Barometer (embedded BMP-388) + self.baro_pub_ = self.create_publisher( + Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data + ) + self.baro_state = Barometer() + self.baro_state.header.frame_id = "core_bmp388" + + # Old + + if not self.use_ros2_control: + # /core/control + self.control_sub = self.create_subscription( + CoreControl, "/core/control", self.send_controls, 10 + ) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff + # /core/feedback + self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10) + self.core_feedback = CoreFeedback() + # Debug + self.debug_pub = self.create_publisher(String, "/core/debug", 10) + self.ping_service = self.create_service( + Empty, "/astra/core/ping", self.ping_callback + ) + + ################################################## + # Find microcontroller (Non-anchor only) + + # Core (non-anchor) specific + if self.launch_mode == "core": + # Loop through all serial devices on the computer to check for the MCU + self.port = None + ports = SerialRelay.list_serial_ports() + for i in range(2): + for port in ports: + try: + # connect and send a ping command + ser = serial.Serial(port, 115200, timeout=1) + # (f"Checking port {port}...") + ser.write(b"ping\n") + response = ser.read_until("\n") # type: ignore + + # if pong is in response, then we are talking with the MCU + if b"pong" in response: + self.port = port + self.get_logger().info(f"Found MCU at {self.port}!") + self.get_logger().info(f"Enabling Relay Mode") + ser.write(b"can_relay_mode,on\n") + break + except: + pass + if self.port is not None: + break + + if self.port is None: + self.get_logger().info("Unable to find MCU...") + time.sleep(1) + sys.exit(1) + + self.ser = serial.Serial(self.port, 115200) + atexit.register(self.cleanup) + # end __init__() + + def run(self): + # This thread makes all the update processes run in the background + global thread + thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) + thread.start() + + try: + while rclpy.ok(): + if self.launch_mode == "core": + self.read_MCU() # Check the MCU for updates + except KeyboardInterrupt: + sys.exit(0) + + def read_MCU(self): # NON-ANCHOR SPECIFIC + try: + output = str(self.ser.readline(), "utf8") + + if output: + # All output over debug temporarily + print(f"[MCU] {output}") + msg = String() + msg.data = output + self.debug_pub.publish(msg) + return + except serial.SerialException as e: + print(f"SerialException: {e}") + print("Closing serial port.") + if self.ser.is_open: + self.ser.close() + sys.exit(1) + except TypeError as e: + print(f"TypeError: {e}") + print("Closing serial port.") + if self.ser.is_open: + self.ser.close() + sys.exit(1) + except Exception as e: + print(f"Exception: {e}") + print("Closing serial port.") + if self.ser.is_open: + self.ser.close() + sys.exit(1) + + def scale_duty(self, value: float, max_speed: float): + leftMin = -1 + leftMax = 1 + rightMin = -max_speed / 100.0 + rightMax = max_speed / 100.0 + + # Figure out how 'wide' each range is + leftSpan = leftMax - leftMin + rightSpan = rightMax - rightMin + + # Convert the left range into a 0-1 range (float) + valueScaled = float(value - leftMin) / float(leftSpan) + + # Convert the 0-1 range into a value in the right range. + return str(rightMin + (valueScaled * rightSpan)) + + def send_controls(self, msg: CoreControl): + if msg.turn_to_enable: + command = ( + "can_relay_tovic,core,41," + + str(msg.turn_to) + + "," + + str(msg.turn_to_timeout) + + "\n" + ) + else: + command = ( + "can_relay_tovic,core,19," + + self.scale_duty(msg.left_stick, msg.max_speed) + + "," + + self.scale_duty(msg.right_stick, msg.max_speed) + + "\n" + ) + self.send_cmd(command) + + # Brake mode + command = "can_relay_tovic,core,18," + str(int(msg.brake)) + "\n" + self.send_cmd(command) + + # print(f"[Sys] Relaying: {command}") + + def joint_command_callback(self, msg: JointState): + # So... topic based control node publishes JointState messages over /joint_commands + # with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints + # are included in msg.name, but ig it is implied that msg.velocity only + # includes velocities for the commanded joints (ros__parameters.joints). + # So, this will be much more hacky and less adaptable than I would like it to be. + if len(msg.name) != 5 or len(msg.velocity) != 4 or len(msg.position) != 0: + self.get_logger().warning( + f"Received joint control message with unexpected number of joints. Ignoring." + ) + return + if msg.name != [ + "left_suspension_joint", + "bl_wheel_joint", + "br_wheel_joint", + "fl_wheel_joint", + "fr_wheel_joint", + ]: + self.get_logger().warning( + f"Received joint control message with unexpected name[]. Ignoring." + ) + return + + (bl_vel, br_vel, fl_vel, fr_vel) = msg.velocity + + bl_rpm = radps_to_rpm(bl_vel) * CORE_GEAR_RATIO + br_rpm = radps_to_rpm(br_vel) * CORE_GEAR_RATIO + fl_rpm = radps_to_rpm(fl_vel) * CORE_GEAR_RATIO + fr_rpm = radps_to_rpm(fr_vel) * CORE_GEAR_RATIO + + self.send_viccan( + 20, [fl_rpm, bl_rpm, fr_rpm, br_rpm] + ) # order expected by embedded + + def cmd_vel_callback(self, msg: TwistStamped): + linear = msg.twist.linear.x + angular = -msg.twist.angular.z + + vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS + vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS + + vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO + vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO + + self.send_viccan(20, [vel_left_rpm, vel_right_rpm]) + + def twist_man_callback(self, msg: Twist): + linear = msg.linear.x # [-1 1] for forward/back from left stick y + angular = msg.angular.z # [-1 1] for left/right from right stick x + + if linear < 0: # reverse turning direction when going backwards (WIP) + angular *= -1 + + if abs(linear) > 1 or abs(angular) > 1: + # if speed is greater than 1, then there is a problem + # make it look like a problem and don't just run away lmao + linear = copysign( + 0.25, linear + ) # 0.25 duty cycle in direction of control (hopefully slow) + angular = copysign(0.25, angular) + + duty_left = linear - angular + duty_right = linear + angular + scale = max(1, abs(duty_left), abs(duty_right)) + duty_left /= scale + duty_right /= scale + + # Apply max duty cycle + # Joysticks provide values [-1, 1] rather than real units + duty_left = map_range( + duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty + ) + duty_right = map_range( + duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty + ) + + self.send_viccan(19, [duty_left, duty_right]) + + def control_state_callback(self, msg: CoreCtrlState): + # Brake mode + self.send_viccan(18, [msg.brake_mode]) + + # Max duty cycle + self.twist_max_duty = msg.max_duty # twist_man_callback will handle this + + def send_cmd(self, msg: str): + if self.launch_mode == "anchor": + # self.get_logger().info(f"[Core to Anchor Relay] {msg}") + output = String() # Convert to std_msg string + output.data = msg + self.anchor_pub.publish(output) + elif self.launch_mode == "core": + self.get_logger().info(f"[Core to MCU] {msg}") + self.ser.write(bytes(msg, "utf8")) + + def send_viccan(self, cmd_id: int, data: list[float]): + self.anchor_tovic_pub_.publish( + VicCAN( + header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"), + mcu_name="core", + command_id=cmd_id, + data=data, + ) + ) + + def anchor_feedback(self, msg: String): + output = msg.data + parts = str(output.strip()).split(",") + # GNSS Latitude + if output.startswith("can_relay_fromvic,core,48"): + self.core_feedback.gps_lat = float(parts[3]) + # GNSS Longitude + elif output.startswith("can_relay_fromvic,core,49"): + self.core_feedback.gps_long = float(parts[3]) + # GNSS Satellite count and altitude + elif output.startswith("can_relay_fromvic,core,50"): + self.core_feedback.gps_sats = round(float(parts[3])) + self.core_feedback.gps_alt = round(float(parts[4]), 2) + # Gyro x, y, z, and imu calibration + elif output.startswith("can_relay_fromvic,core,51"): + self.core_feedback.bno_gyro.x = float(parts[3]) + self.core_feedback.bno_gyro.y = float(parts[4]) + self.core_feedback.bno_gyro.z = float(parts[5]) + self.core_feedback.imu_calib = round(float(parts[6])) + # Accel x, y, z, heading + elif output.startswith("can_relay_fromvic,core,52"): + self.core_feedback.bno_accel.x = float(parts[3]) + self.core_feedback.bno_accel.y = float(parts[4]) + self.core_feedback.bno_accel.z = float(parts[5]) + self.core_feedback.orientation = float(parts[6]) + # REV Sparkmax feedback + elif output.startswith("can_relay_fromvic,core,53"): + motorId = round(float(parts[3])) + temp = float(parts[4]) / 10.0 + voltage = float(parts[5]) / 10.0 + current = float(parts[6]) / 10.0 + if motorId == 1: + self.core_feedback.fl_temp = temp + self.core_feedback.fl_voltage = voltage + self.core_feedback.fl_current = current + elif motorId == 2: + self.core_feedback.bl_temp = temp + self.core_feedback.bl_voltage = voltage + self.core_feedback.bl_current = current + elif motorId == 3: + self.core_feedback.fr_temp = temp + self.core_feedback.fr_voltage = voltage + self.core_feedback.fr_current = current + elif motorId == 4: + self.core_feedback.br_temp = temp + self.core_feedback.br_voltage = voltage + self.core_feedback.br_current = current + # Voltages batt, 12, 5, 3, all * 100 + elif output.startswith("can_relay_fromvic,core,54"): + self.core_feedback.bat_voltage = float(parts[3]) / 100.0 + self.core_feedback.voltage_12 = float(parts[4]) / 100.0 + self.core_feedback.voltage_5 = float(parts[5]) / 100.0 + self.core_feedback.voltage_3 = float(parts[6]) / 100.0 + # BMP temperature, altitude, pressure + elif output.startswith("can_relay_fromvic,core,56"): + self.core_feedback.bmp_temp = float(parts[3]) + self.core_feedback.bmp_alt = float(parts[4]) + self.core_feedback.bmp_pres = float(parts[5]) + else: + return + self.feedback_new_state.header.stamp = self.get_clock().now().to_msg() + self.feedback_new_pub_.publish(self.feedback_new_state) + # self.get_logger().info(f"[Core Anchor] {msg}") + + def relay_fromvic(self, msg: VicCAN): + # Assume that the message is coming from Core + # skill diff if not + + # Check message len to prevent crashing on bad data + if msg.command_id in viccan_msg_len_dict: + expected_len = viccan_msg_len_dict[msg.command_id] + if len(msg.data) != expected_len: + self.get_logger().warning( + f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" + ) + return + + match msg.command_id: + # GNSS + case 48: # GNSS Latitude + self.gps_state.latitude = float(msg.data[0]) + case 49: # GNSS Longitude + self.gps_state.longitude = float(msg.data[0]) + case 50: # GNSS Satellite count and altitude + self.gps_state.status.status = ( + NavSatStatus.STATUS_FIX + if int(msg.data[0]) >= 3 + else NavSatStatus.STATUS_NO_FIX + ) + self.gps_state.altitude = float(msg.data[1]) + self.gps_state.header.stamp = msg.header.stamp + self.gps_pub_.publish(self.gps_state) + # IMU + case 51: # Gyro x, y, z, and imu calibration + self.feedback_new_state.imu_calib = round(float(msg.data[3])) + self.imu_state.angular_velocity.x = float(msg.data[0]) + self.imu_state.angular_velocity.y = float(msg.data[1]) + self.imu_state.angular_velocity.z = float(msg.data[2]) + self.imu_state.header.stamp = msg.header.stamp + case 52: # Accel x, y, z, heading + self.imu_state.linear_acceleration.x = float(msg.data[0]) + self.imu_state.linear_acceleration.y = float(msg.data[1]) + self.imu_state.linear_acceleration.z = float(msg.data[2]) + # Deal with quaternion + r = Rotation.from_euler("z", float(msg.data[3]), degrees=True) + q = r.as_quat() + self.imu_state.orientation.x = q[0] + self.imu_state.orientation.y = q[1] + self.imu_state.orientation.z = q[2] + self.imu_state.orientation.w = q[3] + self.imu_state.header.stamp = msg.header.stamp + self.imu_pub_.publish(self.imu_state) + # REV Motors + case 53: # REV SPARK MAX feedback + motorId = round(float(msg.data[0])) + temp = float(msg.data[1]) / 10.0 + voltage = float(msg.data[2]) / 10.0 + current = float(msg.data[3]) / 10.0 + motor: RevMotorState | None = None + match motorId: + case 1: + motor = self.feedback_new_state.fl_motor + case 2: + motor = self.feedback_new_state.bl_motor + case 3: + motor = self.feedback_new_state.fr_motor + case 4: + motor = self.feedback_new_state.br_motor + case _: + self.get_logger().warning( + f"Ignoring REV motor feedback 53 with invalid motorId {motorId}" + ) + return + + if motor: + motor.temperature = temp + motor.voltage = voltage + motor.current = current + motor.header.stamp = msg.header.stamp + + self.feedback_new_pub_.publish(self.feedback_new_state) + # Board voltage + case 54: # Voltages batt, 12, 5, 3, all * 100 + self.feedback_new_state.board_voltage.vbatt = float(msg.data[0]) / 100.0 + self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0 + self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0 + self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0 + # Baro + case 56: # BMP temperature, altitude, pressure + self.baro_state.temperature = float(msg.data[0]) + self.baro_state.altitude = float(msg.data[1]) + self.baro_state.pressure = float(msg.data[2]) + self.baro_state.header.stamp = msg.header.stamp + self.baro_pub_.publish(self.baro_state) + # REV Motors (pos and vel) + case 58: # REV position and velocity + motorId = round(float(msg.data[0])) + position = float(msg.data[1]) + velocity = float(msg.data[2]) + joint_state_msg = ( + JointState() + ) # TODO: not sure if all motors should be in each message or not + joint_state_msg.position = [ + position * (2 * pi) / CORE_GEAR_RATIO + ] # revolutions to radians + joint_state_msg.velocity = [ + velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO + ] # RPM to rad/s + + motor: RevMotorState | None = None + + match motorId: + case 1: + motor = self.feedback_new_state.fl_motor + joint_state_msg.name = ["fl_wheel_joint"] + case 2: + motor = self.feedback_new_state.bl_motor + joint_state_msg.name = ["bl_wheel_joint"] + case 3: + motor = self.feedback_new_state.fr_motor + joint_state_msg.name = ["fr_wheel_joint"] + case 4: + motor = self.feedback_new_state.br_motor + joint_state_msg.name = ["br_wheel_joint"] + case _: + self.get_logger().warning( + f"Ignoring REV motor feedback 58 with invalid motorId {motorId}" + ) + return + + # make the fucking shit work + joint_state_msg.name.append("left_suspension_joint") + joint_state_msg.position.append(0.0) + joint_state_msg.velocity.append(0.0) + + joint_state_msg.header.stamp = msg.header.stamp + self.joint_state_pub_.publish(joint_state_msg) + case _: + return + + def publish_feedback(self): + # self.get_logger().info(f"[Core] {self.core_feedback}") + self.feedback_pub.publish(self.core_feedback) + + def ping_callback(self, request, response): + return response + + @staticmethod + def list_serial_ports(): + return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") + + def cleanup(self): + print("Cleaning up before terminating...") + try: + if self.ser.is_open: + self.ser.close() + except Exception as e: + exit(0) + + +def myexcepthook(type, value, tb): + print("Uncaught exception:", type, value) + if serial_pub: + serial_pub.cleanup() + + +def map_range( + value: float, in_min: float, in_max: float, out_min: float, out_max: float +): + return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min + + +def radps_to_rpm(radps: float): + return radps * 60 / (2 * pi) + + +def main(args=None): + rclpy.init(args=args) + sys.excepthook = myexcepthook + + global serial_pub + + serial_pub = SerialRelay() + serial_pub.run() + + +if __name__ == "__main__": + # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly + main() diff --git a/src/headless_pkg/src/headless_node.py b/src/headless_pkg/src/headless_node.py index af1e0ec..d5138a5 100755 --- a/src/headless_pkg/src/headless_node.py +++ b/src/headless_pkg/src/headless_node.py @@ -6,22 +6,28 @@ from rclpy.duration import Duration import signal import time -import atexit import os import sys -import threading -import glob import pwd import grp from math import copysign -from std_msgs.msg import String -from geometry_msgs.msg import Twist, TwistStamped +from std_srvs.srv import Trigger from std_msgs.msg import Header +from geometry_msgs.msg import Twist, TwistStamped +from control_msgs.msg import JointJog from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreCtrlState +import warnings + +# Literally headless +warnings.filterwarnings( + "ignore", + message="Your system is avx2 capable but pygame was not built with support for it.", +) + import pygame os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display @@ -35,36 +41,44 @@ CORE_STOP_TWIST_MSG = Twist() # " ARM_STOP_MSG = ArmManual() # " BIO_STOP_MSG = BioControl() # " + control_qos = qos.QoSProfile( - # history=qos.QoSHistoryPolicy.KEEP_LAST, + history=qos.QoSHistoryPolicy.KEEP_LAST, depth=2, - # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, - # durability=qos.QoSDurabilityPolicy.VOLATILE, + reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, + durability=qos.QoSDurabilityPolicy.VOLATILE, # deadline=Duration(seconds=1), # lifespan=Duration(nanoseconds=500_000_000), # 500ms # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness_lease_duration=Duration(seconds=5), ) -CORE_MODE = "twist" # "twist" or "duty" + +STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05")) +ARM_DEADZONE = float(os.getenv("ARM_DEADZONE", "0.2")) class Headless(Node): + # Every non-fixed joint defined in Arm's URDF + # Used for JointState and JointJog messsages + all_joint_names = [ + "axis_0_joint", + "axis_1_joint", + "axis_2_joint", + "axis_3_joint", + "wrist_yaw_joint", + "wrist_roll_joint", + "ef_gripper_left_joint", + ] + def __init__(self): # Initialize pygame first pygame.init() pygame.joystick.init() - super().__init__("headless") - - self.declare_parameter("use_cmd_vel", False) - self.using_cmd_vel = self.get_parameter("use_cmd_vel").value - if self.using_cmd_vel: - self.get_logger().info("Using cmd_vel for core control") - global CORE_MODE - CORE_MODE = "twist" - else: - self.get_logger().info("Using astra_msgs/CoreControl for core control") + super().__init__("headless_node") + ################################################## + # Preamble # Wait for anchor to start pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug") @@ -116,43 +130,134 @@ class Headless(Node): break id += 1 - self.create_timer(0.15, self.send_controls) + ################################################## + # Parameters - self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) - self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) - self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) + self.declare_parameter("use_old_topics", True) + self.use_old_topics = ( + self.get_parameter("use_old_topics").get_parameter_value().bool_value + ) - self.core_twist_pub_ = self.create_publisher( - Twist, "/core/twist", qos_profile=control_qos + self.declare_parameter("use_cmd_vel", False) + self.using_cmd_vel = self.get_parameter("use_cmd_vel").value + + self.declare_parameter("use_bio", False) + self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value + + self.declare_parameter("use_arm_ik", False) + self.use_arm_ik = ( + self.get_parameter("use_arm_ik").get_parameter_value().bool_value ) - self.core_cmd_vel_pub_ = self.create_publisher( - TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos - ) - self.core_state_pub_ = self.create_publisher( - CoreCtrlState, "/core/control/state", qos_profile=control_qos + # NOTE: only applicable if use_old_topics == True + self.declare_parameter("use_new_arm_manual_scheme", True) + self.use_new_arm_manual_scheme = ( + self.get_parameter("use_new_arm_manual_scheme") + .get_parameter_value() + .bool_value ) + # Check parameter validity + if self.using_cmd_vel: + self.get_logger().info("Using cmd_vel for core control") + global CORE_MODE + CORE_MODE = "twist" + else: + self.get_logger().info("Using astra_msgs/CoreControl for core control") + + if self.use_arm_ik and self.use_old_topics: + self.get_logger().fatal("Old topics do not support arm IK control.") + sys.exit(1) + if not self.use_new_arm_manual_scheme and not self.use_old_topics: + self.get_logger().warn( + f"New arm manual does not support old control scheme. Defaulting to new scheme." + ) + self.ctrl_mode = "core" # Start in core mode self.core_brake_mode = False self.core_max_duty = 0.5 # Default max duty cycle (walking speed) + ################################################## + # Old Topics + + if self.use_old_topics: + self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) + self.arm_publisher = self.create_publisher( + ArmManual, "/arm/control/manual", 2 + ) + self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) + + ################################################## + # New Topics + + if not self.use_old_topics: + self.core_twist_pub_ = self.create_publisher( + Twist, "/core/twist", qos_profile=control_qos + ) + self.core_cmd_vel_pub_ = self.create_publisher( + TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos + ) + self.core_state_pub_ = self.create_publisher( + CoreCtrlState, "/core/control/state", qos_profile=control_qos + ) + + self.arm_manual_pub_ = self.create_publisher( + JointJog, "/arm/manual_new", qos_profile=control_qos + ) + + self.arm_ik_twist_publisher = self.create_publisher( + TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos + ) + self.arm_ik_jointjog_publisher = self.create_publisher( + JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos + ) + + # TODO: add new bio topics + + ################################################## + # Timers + + self.create_timer(0.1, self.send_controls) + + ################################################## + # Services + + # If using IK control, we have to "start" the servo node to enable it to accept commands + self.servo_start_client = None + if self.use_arm_ik: + self.get_logger().info("Starting servo node for IK control...") + self.servo_start_client = self.create_client( + Trigger, "/servo_node/start_servo" + ) + timeout_counter = 0 + while not self.servo_start_client.wait_for_service(timeout_sec=1.0): + self.get_logger().info("Waiting for servo_node/start_servo service...") + timeout_counter += 1 + if timeout_counter >= 10: + self.get_logger().error( + "Servo's start service not available. IK control will not work." + ) + break + if self.servo_start_client.service_is_ready(): + self.servo_start_client.call_async(Trigger.Request()) + # Rumble when node is ready (returns False if rumble not supported) self.gamepad.rumble(0.7, 0.8, 150) # Added so you can tell when it starts running after changing the constant logging to debug from info self.get_logger().info("Defaulting to Core mode. Ready.") - def run(self): - # This thread makes all the update processes run in the background - thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) - thread.start() - - try: - while rclpy.ok(): - self.send_controls() - time.sleep(0.1) # Small delay to avoid CPU hogging - except KeyboardInterrupt: - sys.exit(0) + def stop_all(self): + if self.use_old_topics: + self.core_publisher.publish(CORE_STOP_MSG) + self.arm_publisher.publish(ARM_STOP_MSG) + self.bio_publisher.publish(BIO_STOP_MSG) + else: + self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG) + if self.use_arm_ik: + self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg()) + else: + self.arm_manual_pub_.publish(self.arm_manual_stop_msg()) + # TODO: add bio here after implementing new topics def send_controls(self): """Read the gamepad state and publish control messages""" @@ -164,10 +269,8 @@ class Headless(Node): # Check if controller is still connected if pygame.joystick.get_count() != self.num_gamepads: print("Gamepad disconnected. Exiting...") - # Send one last zero control message - self.core_publisher.publish(CORE_STOP_MSG) - self.arm_publisher.publish(ARM_STOP_MSG) - self.bio_publisher.publish(BIO_STOP_MSG) + # Stop the rover if controller disconnected + self.stop_all() self.get_logger().info("Final stop commands sent. Shutting down.") # Clean up pygame.quit() @@ -183,20 +286,51 @@ class Headless(Node): new_ctrl_mode = "core" if new_ctrl_mode != self.ctrl_mode: + self.stop_all() self.gamepad.rumble(0.6, 0.7, 75) self.ctrl_mode = new_ctrl_mode self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") + if self.ctrl_mode == "arm" and self.use_bio: + self.get_logger().warning("NOTE: Using bio instead of arm.") - # CORE - if self.ctrl_mode == "core" and CORE_MODE == "duty": + # Actually send the controls + if self.ctrl_mode == "core": + self.send_core() + if self.use_old_topics: + if self.use_bio: + self.bio_publisher.publish(BIO_STOP_MSG) + else: + self.arm_publisher.publish(ARM_STOP_MSG) + # New topics shouldn't need to constantly send zeroes imo + else: + if self.use_bio: + self.send_bio() + else: + self.send_arm() + if self.use_old_topics: + self.core_publisher.publish(CORE_STOP_MSG) + # Ditto + + def send_core(self): + # Collect controller state + left_stick_x = stick_deadzone(self.gamepad.get_axis(0)) + left_stick_y = stick_deadzone(self.gamepad.get_axis(1)) + left_trigger = stick_deadzone(self.gamepad.get_axis(2)) + right_stick_x = stick_deadzone(self.gamepad.get_axis(3)) + right_stick_y = stick_deadzone(self.gamepad.get_axis(4)) + right_trigger = stick_deadzone(self.gamepad.get_axis(5)) + button_a = self.gamepad.get_button(0) + button_b = self.gamepad.get_button(1) + button_x = self.gamepad.get_button(2) + button_y = self.gamepad.get_button(3) + left_bumper = self.gamepad.get_button(4) + right_bumper = self.gamepad.get_button(5) + dpad_input = self.gamepad.get_hat(0) + + if self.use_old_topics: input = CoreControl() input.max_speed = 90 - # Collect controller state - left_stick_y = deadzone(self.gamepad.get_axis(1)) - right_stick_y = deadzone(self.gamepad.get_axis(4)) - right_trigger = deadzone(self.gamepad.get_axis(5)) - # Right wheels input.right_stick = float(round(-1 * right_stick_y, 2)) @@ -211,19 +345,10 @@ class Headless(Node): self.get_logger().info(f"[Ctrl] {output}") self.core_publisher.publish(input) - self.arm_publisher.publish(ARM_STOP_MSG) - # self.bio_publisher.publish(BIO_STOP_MSG) - elif self.ctrl_mode == "core" and CORE_MODE == "twist": + else: # New topics twist = Twist() - # Collect controller state - left_stick_y = deadzone(self.gamepad.get_axis(1)) - right_stick_x = deadzone(self.gamepad.get_axis(3)) - button_a = self.gamepad.get_button(0) - left_bumper = self.gamepad.get_button(4) - right_bumper = self.gamepad.get_button(5) - # Forward/back and Turn twist.linear.x = -1.0 * left_stick_y twist.angular.z = -1.0 * copysign( @@ -240,8 +365,6 @@ class Headless(Node): else: header = Header(stamp=self.get_clock().now().to_msg()) self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist)) - self.arm_publisher.publish(ARM_STOP_MSG) - # self.bio_publisher.publish(BIO_STOP_MSG) self.get_logger().debug( f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}" ) @@ -266,91 +389,310 @@ class Headless(Node): state_msg = CoreCtrlState() state_msg.brake_mode = bool(self.core_brake_mode) state_msg.max_duty = float(self.core_max_duty) + self.core_state_pub_.publish(state_msg) self.get_logger().info( f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}" ) - # ARM and BIO - if self.ctrl_mode == "arm": + def send_arm(self): + # Collect controller state + left_stick_x = stick_deadzone(self.gamepad.get_axis(0)) + left_stick_y = stick_deadzone(self.gamepad.get_axis(1)) + left_trigger = stick_deadzone(self.gamepad.get_axis(2)) + right_stick_x = stick_deadzone(self.gamepad.get_axis(3)) + right_stick_y = stick_deadzone(self.gamepad.get_axis(4)) + right_trigger = stick_deadzone(self.gamepad.get_axis(5)) + button_a = self.gamepad.get_button(0) + button_b = self.gamepad.get_button(1) + button_x = self.gamepad.get_button(2) + button_y = self.gamepad.get_button(3) + left_bumper = self.gamepad.get_button(4) + right_bumper = self.gamepad.get_button(5) + dpad_input = self.gamepad.get_hat(0) + + # OLD MANUAL + # ========== + + if not self.use_arm_ik and self.use_old_topics: arm_input = ArmManual() - # Collect controller state - left_stick_x = deadzone(self.gamepad.get_axis(0)) - left_stick_y = deadzone(self.gamepad.get_axis(1)) - left_trigger = deadzone(self.gamepad.get_axis(2)) - right_stick_x = deadzone(self.gamepad.get_axis(3)) - right_stick_y = deadzone(self.gamepad.get_axis(4)) - right_trigger = deadzone(self.gamepad.get_axis(5)) - right_bumper = self.gamepad.get_button(5) - dpad_input = self.gamepad.get_hat(0) + # OLD ARM MANUAL CONTROL SCHEME + if not self.use_new_arm_manual_scheme: + # EF Grippers + if left_trigger > 0 and right_trigger > 0: + arm_input.gripper = 0 + elif left_trigger > 0: + arm_input.gripper = -1 + elif right_trigger > 0: + arm_input.gripper = 1 - # EF Grippers - if left_trigger > 0 and right_trigger > 0: - arm_input.gripper = 0 - elif left_trigger > 0: - arm_input.gripper = -1 - elif right_trigger > 0: - arm_input.gripper = 1 + # Axis 0 + if dpad_input[0] == 1: + arm_input.axis0 = 1 + elif dpad_input[0] == -1: + arm_input.axis0 = -1 - # Axis 0 - if dpad_input[0] == 1: - arm_input.axis0 = 1 - elif dpad_input[0] == -1: - arm_input.axis0 = -1 + if right_bumper: # Control end effector - if right_bumper: # Control end effector + # Effector yaw + if left_stick_x > 0: + arm_input.effector_yaw = 1 + elif left_stick_x < 0: + arm_input.effector_yaw = -1 - # Effector yaw - if left_stick_x > 0: - arm_input.effector_yaw = 1 - elif left_stick_x < 0: - arm_input.effector_yaw = -1 + # Effector roll + if right_stick_x > 0: + arm_input.effector_roll = 1 + elif right_stick_x < 0: + arm_input.effector_roll = -1 - # Effector roll - if right_stick_x > 0: - arm_input.effector_roll = 1 - elif right_stick_x < 0: + else: # Control arm axis + + # Axis 1 + if abs(left_stick_x) > 0.15: + arm_input.axis1 = round(left_stick_x) + + # Axis 2 + if abs(left_stick_y) > 0.15: + arm_input.axis2 = -1 * round(left_stick_y) + + # Axis 3 + if abs(right_stick_y) > 0.15: + arm_input.axis3 = -1 * round(right_stick_y) + + # NEW ARM MANUAL CONTROL SCHEME + if self.use_new_arm_manual_scheme: + # Right stick: EF yaw and axis 3 + # Left stick: axis 1 and 2 + # D-pad: axis 0 and _ + # Triggers: EF grippers + # Bumpers: EF roll + # A: brake + # B: linear actuator in + # X: _ + # Y: linear actuator out + + # Right stick: EF yaw and axis 3 + arm_input.effector_yaw = stick_to_arm_direction(right_stick_x) + arm_input.axis3 = -1 * stick_to_arm_direction(right_stick_y) + + # Left stick: axis 1 and 2 + arm_input.axis1 = stick_to_arm_direction(left_stick_x) + arm_input.axis2 = -1 * stick_to_arm_direction(left_stick_y) + + # D-pad: axis 0 and _ + arm_input.axis0 = int(dpad_input[0]) + + # Triggers: EF Grippers + if left_trigger > 0 and right_trigger > 0: + arm_input.gripper = 0 + elif left_trigger > 0: + arm_input.gripper = -1 + elif right_trigger > 0: + arm_input.gripper = 1 + + # Bumpers: EF roll + if left_bumper > 0 and right_bumper > 0: + arm_input.effector_roll = 0 + elif left_bumper > 0: arm_input.effector_roll = -1 + elif right_bumper > 0: + arm_input.effector_roll = 1 - else: # Control arm axis + # A: brake + if button_a: + arm_input.brake = True - # Axis 1 - if abs(left_stick_x) > 0.15: - arm_input.axis1 = round(left_stick_x) + # Y: linear actuator + if button_y and not button_b: + arm_input.linear_actuator = 1 + elif button_b and not button_y: + arm_input.linear_actuator = -1 + else: + arm_input.linear_actuator = 0 - # Axis 2 - if abs(left_stick_y) > 0.15: - arm_input.axis2 = -1 * round(left_stick_y) + self.arm_publisher.publish(arm_input) - # Axis 3 - if abs(right_stick_y) > 0.15: - arm_input.axis3 = -1 * round(right_stick_y) + # NEW MANUAL + # ========== - # BIO + elif not self.use_arm_ik and not self.use_old_topics: + arm_input = JointJog() + arm_input.header.frame_id = "base_link" + arm_input.header.stamp = self.get_clock().now().to_msg() + arm_input.joint_names = self.all_joint_names + arm_input.velocities = [0.0] * len(self.all_joint_names) + + # Right stick: EF yaw and axis 3 + # Left stick: axis 1 and 2 + # D-pad: axis 0 and _ + # Triggers: EF grippers + # Bumpers: EF roll + # A: brake + # B: linear actuator in + # X: _ + # Y: linear actuator out + + # Right stick: EF yaw and axis 3 + arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float( + stick_to_arm_direction(right_stick_x) + ) + arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float( + stick_to_arm_direction(right_stick_y) + ) + + # Left stick: axis 1 and 2 + arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = float( + stick_to_arm_direction(left_stick_x) + ) + arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float( + stick_to_arm_direction(left_stick_y) + ) + + # D-pad: axis 0 and _ + arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = float( + dpad_input[0] + ) + + # Triggers: EF Grippers + if left_trigger > 0 and right_trigger > 0: + arm_input.velocities[ + self.all_joint_names.index("ef_gripper_left_joint") + ] = 0.0 + elif left_trigger > 0: + arm_input.velocities[ + self.all_joint_names.index("ef_gripper_left_joint") + ] = -1.0 + elif right_trigger > 0: + arm_input.velocities[ + self.all_joint_names.index("ef_gripper_left_joint") + ] = 1.0 + + # Bumpers: EF roll + arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = ( + right_bumper - left_bumper + ) + + # A: brake + # TODO: Brake mode + + # Y: linear actuator + # TODO: linear actuator + + self.arm_manual_pub_.publish(arm_input) + + # IK (ONLY NEW) + # ============= + + elif self.use_arm_ik: + arm_twist = TwistStamped() + arm_twist.header.frame_id = "base_link" + arm_twist.header.stamp = self.get_clock().now().to_msg() + arm_jointjog = JointJog() + arm_jointjog.header.frame_id = "base_link" + arm_jointjog.header.stamp = self.get_clock().now().to_msg() + + # Right stick: linear y and linear x + # Left stick: angular z and linear z + # D-pad: angular y and _ + # Triggers: EF grippers + # Bumpers: angular x + # A: brake + # B: IK mode + # X: manual mode + # Y: linear actuator + + # Right stick: linear y and linear x + arm_twist.twist.linear.y = float(right_stick_x) + arm_twist.twist.linear.x = float(right_stick_y) + + # Left stick: angular z and linear z + arm_twist.twist.angular.z = float(-1 * left_stick_x) + arm_twist.twist.linear.z = float(-1 * left_stick_y) + # D-pad: angular y and _ + arm_twist.twist.angular.y = ( + float(0) + if dpad_input[0] == 0 + else float(-1 * copysign(0.75, dpad_input[0])) + ) + + # Triggers: EF Grippers + if left_trigger > 0 or right_trigger > 0: + arm_jointjog.joint_names.append("ef_gripper_left_joint") # type: ignore + arm_jointjog.velocities.append(float(right_trigger - left_trigger)) + + # Bumpers: angular x + if left_bumper > 0 and right_bumper > 0: + arm_twist.twist.angular.x = float(0) + elif left_bumper > 0: + arm_twist.twist.angular.x = float(1) + elif right_bumper > 0: + arm_twist.twist.angular.x = float(-1) + + self.arm_ik_twist_publisher.publish(arm_twist) + # self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out + + def send_bio(self): + # Collect controller state + left_stick_x = stick_deadzone(self.gamepad.get_axis(0)) + left_stick_y = stick_deadzone(self.gamepad.get_axis(1)) + left_trigger = stick_deadzone(self.gamepad.get_axis(2)) + right_stick_x = stick_deadzone(self.gamepad.get_axis(3)) + right_stick_y = stick_deadzone(self.gamepad.get_axis(4)) + right_trigger = stick_deadzone(self.gamepad.get_axis(5)) + button_a = self.gamepad.get_button(0) + button_b = self.gamepad.get_button(1) + button_x = self.gamepad.get_button(2) + button_y = self.gamepad.get_button(3) + left_bumper = self.gamepad.get_button(4) + right_bumper = self.gamepad.get_button(5) + dpad_input = self.gamepad.get_hat(0) + + if self.use_old_topics: bio_input = BioControl( bio_arm=int(left_stick_y * -100), drill_arm=int(round(right_stick_y) * -100), ) # Drill motor (FAERIE) - if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0: + if left_trigger > 0 or right_trigger > 0: bio_input.drill = int( 30 * (right_trigger - left_trigger) ) # Max duty cycle 30% - self.core_publisher.publish(CORE_STOP_MSG) - self.arm_publisher.publish(arm_input) - # self.bio_publisher.publish(bio_input) + self.bio_publisher.publish(bio_input) + + else: + pass # TODO: implement new bio control topics + + def arm_manual_stop_msg(self): + return JointJog( + header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()), + joint_names=self.all_joint_names, + velocities=[0.0] * len(self.all_joint_names), + ) + + def arm_ik_twist_stop_msg(self): + return TwistStamped( + header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()) + ) -def deadzone(value: float, threshold=0.05) -> float: +def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float: """Apply a deadzone to a joystick input so the motors don't sound angry""" if abs(value) < threshold: return 0 return value +def stick_to_arm_direction(value: float, threshold=ARM_DEADZONE) -> int: + """Apply a larger deadzone to a stick input and make digital/binary instead of analog""" + if abs(value) < threshold: + return 0 + return int(copysign(1, value)) + + def is_user_in_group(group_name: str) -> bool: # Copied from https://zetcode.com/python/os-getgrouplist/ try: @@ -369,20 +711,26 @@ def is_user_in_group(group_name: str) -> bool: return False +def exit_handler(signum, frame): + print("Caught SIGTERM. Exiting...") + rclpy.try_shutdown() + sys.exit(0) + + def main(args=None): try: rclpy.init(args=args) + # Catch termination signals and exit cleanly + signal.signal(signal.SIGTERM, exit_handler) + node = Headless() rclpy.spin(node) except (KeyboardInterrupt, ExternalShutdownException): print("Caught shutdown signal. Exiting...") finally: - rclpy.shutdown() + rclpy.try_shutdown() if __name__ == "__main__": - signal.signal( - signal.SIGTERM, lambda signum, frame: sys.exit(0) - ) # Catch termination signals and exit cleanly main() diff --git a/src/latency_tester/src/embedded_ping.cpp b/src/latency_tester/src/embedded_ping.cpp index 37c301f..d1efc04 100644 --- a/src/latency_tester/src/embedded_ping.cpp +++ b/src/latency_tester/src/embedded_ping.cpp @@ -26,7 +26,7 @@ class LatencyTester : public rclcpp::Node { public: LatencyTester() - : Node("latency_tester"), count_(0), target_mcu_("core") + : Node("latency_tester"), count_(0) { publisher_ = this->create_publisher("/anchor/relay", 10); timer_ = this->create_wall_timer( @@ -35,6 +35,8 @@ public: "/anchor/debug", 10, std::bind(&LatencyTester::response_callback, this, std::placeholders::_1)); + + target_mcu_ = this->declare_parameter("target_mcu", "core"); } private: diff --git a/src/servo_arm_twist_pkg/CMakeLists.txt b/src/servo_arm_twist_pkg/CMakeLists.txt deleted file mode 100644 index 8719d44..0000000 --- a/src/servo_arm_twist_pkg/CMakeLists.txt +++ /dev/null @@ -1,80 +0,0 @@ -cmake_minimum_required(VERSION 3.22) -project(servo_arm_twist_pkg) - -# C++ Libraries ################################################# - -# Core C++ library for calculations and collision checking. -# Provides interface used by the component node. -set(SERVO_LIB_NAME servo_arm_twist_lib) - -# Pose Tracking -set(POSE_TRACKING pose_tracking) - -# Component Nodes (Shared libraries) ############################ -set(SERVO_COMPONENT_NODE servo_node) -set(SERVO_CONTROLLER_INPUT servo_controller_input) - -# Executable Nodes ############################################## -set(SERVO_NODE_MAIN_NAME servo_node_main) -set(POSE_TRACKING_DEMO_NAME servo_pose_tracking_demo) -set(FAKE_SERVO_CMDS_NAME fake_command_publisher) - -################################################################# - -# Common cmake code applied to all moveit packages -find_package(moveit_common REQUIRED) -moveit_package() - -set(THIS_PACKAGE_INCLUDE_DEPENDS - control_msgs - control_toolbox - geometry_msgs - moveit_core - moveit_msgs - moveit_ros_planning - pluginlib - rclcpp - rclcpp_components - sensor_msgs - std_msgs - std_srvs - tf2_eigen - trajectory_msgs -) - -find_package(ament_cmake REQUIRED) -find_package(eigen3_cmake_module REQUIRED) -find_package(Eigen3 REQUIRED) -foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS}) - find_package(${Dependency} REQUIRED) -endforeach() - -##################### -## Component Nodes ## -##################### - -# Add executable for using a controller -add_library(${SERVO_CONTROLLER_INPUT} SHARED src/joystick_twist.cpp) -ament_target_dependencies(${SERVO_CONTROLLER_INPUT} ${THIS_PACKAGE_INCLUDE_DEPENDS}) -rclcpp_components_register_nodes(${SERVO_CONTROLLER_INPUT} "servo_arm_twist_pkg::JoyToServoPub") - -############# -## Install ## -############# - -# Install Libraries -install( - TARGETS - ${SERVO_CONTROLLER_INPUT} - EXPORT export_${PROJECT_NAME} - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib - RUNTIME DESTINATION bin - INCLUDES DESTINATION include -) - -# Install Binaries -ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) -ament_export_dependencies(${THIS_PACKAGE_INCLUDE_DEPENDS}) - -ament_package() diff --git a/src/servo_arm_twist_pkg/README.md b/src/servo_arm_twist_pkg/README.md deleted file mode 100644 index 727ae50..0000000 --- a/src/servo_arm_twist_pkg/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Moveit Servo - -See the [Realtime Arm Servoing Tutorial](https://moveit.picknik.ai/main/doc/realtime_servo/realtime_servo_tutorial.html) for installation instructions, quick-start guide, an overview about `moveit_servo`, and to learn how to set it up on your robot. diff --git a/src/servo_arm_twist_pkg/package.xml b/src/servo_arm_twist_pkg/package.xml deleted file mode 100644 index c2d1601..0000000 --- a/src/servo_arm_twist_pkg/package.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - servo_arm_twist_pkg - 2.5.9 - Provides real-time manipulator Cartesian and joint servoing. - Blake Anderson - Andy Zelenak - Tyler Weaver - Henning Kayser - - BSD 3-Clause - - https://ros-planning.github.io/moveit_tutorials - - Brian O'Neil - Andy Zelenak - Blake Anderson - Alexander Rössler - Tyler Weaver - Adam Pettinger - - ament_cmake - moveit_common - - control_msgs - control_toolbox - geometry_msgs - moveit_msgs - moveit_core - moveit_ros_planning_interface - pluginlib - sensor_msgs - std_msgs - std_srvs - tf2_eigen - trajectory_msgs - - gripper_controllers - joint_state_broadcaster - joint_trajectory_controller - joy - robot_state_publisher - tf2_ros - moveit_configs_utils - launch_param_builder - - ament_cmake_gtest - ament_lint_auto - ament_lint_common - controller_manager - ros_testing - - - ament_cmake - - - diff --git a/src/servo_arm_twist_pkg/src/joystick_twist.cpp b/src/servo_arm_twist_pkg/src/joystick_twist.cpp deleted file mode 100644 index 5a117d0..0000000 --- a/src/servo_arm_twist_pkg/src/joystick_twist.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/********************************************************************* - * Software License Agreement (BSD License) - * - * Copyright (c) 2020, PickNik Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials provided - * with the distribution. - * * Neither the name of PickNik Inc. nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - *********************************************************************/ - -/* Title : joystick_servo_example.cpp - * Project : servo_arm_twist_pkg - * Created : 08/07/2020 - * Author : Adam Pettinger - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// We'll just set up parameters here -const std::string JOY_TOPIC = "/joy"; -const std::string TWIST_TOPIC = "/servo_node/delta_twist_cmds"; -const std::string JOINT_TOPIC = "/servo_node/delta_joint_cmds"; -const std::string EEF_FRAME_ID = "End_Effector"; -const std::string BASE_FRAME_ID = "base_link"; - -// Enums for button names -> axis/button array index -// For XBOX 1 controller -enum Axis -{ - LEFT_STICK_X = 0, - LEFT_STICK_Y = 1, - LEFT_TRIGGER = 2, - RIGHT_STICK_X = 3, - RIGHT_STICK_Y = 4, - RIGHT_TRIGGER = 5, - D_PAD_X = 6, - D_PAD_Y = 7 -}; -enum Button -{ - A = 0, - B = 1, - X = 2, - Y = 3, - LEFT_BUMPER = 4, - RIGHT_BUMPER = 5, - CHANGE_VIEW = 6, - MENU = 7, - HOME = 8, - LEFT_STICK_CLICK = 9, - RIGHT_STICK_CLICK = 10 -}; - -// Some axes have offsets (e.g. the default trigger position is 1.0 not 0) -// This will map the default values for the axes -std::map AXIS_DEFAULTS = { { LEFT_TRIGGER, 1.0 }, { RIGHT_TRIGGER, 1.0 } }; -std::map BUTTON_DEFAULTS; - -// To change controls or setup a new controller, all you should to do is change the above enums and the follow 2 -// functions -/** \brief // This converts a joystick axes and buttons array to a TwistStamped or JointJog message - * @param axes The vector of continuous controller joystick axes - * @param buttons The vector of discrete controller button values - * @param twist A TwistStamped message to update in prep for publishing - * @param joint A JointJog message to update in prep for publishing - * @return return true if you want to publish a Twist, false if you want to publish a JointJog - */ -bool convertJoyToCmd(const std::vector& axes, const std::vector& buttons, - std::unique_ptr& twist, - std::unique_ptr& joint) -{ - // // Give joint jogging priority because it is only buttons - // // If any joint jog command is requested, we are only publishing joint commands - // if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y]) - // { - // // Map the D_PAD to the proximal joints - // joint->joint_names.push_back("panda_joint1"); - // joint->velocities.push_back(axes[D_PAD_X]); - // joint->joint_names.push_back("panda_joint2"); - // joint->velocities.push_back(axes[D_PAD_Y]); - - // // Map the diamond to the distal joints - // joint->joint_names.push_back("panda_joint7"); - // joint->velocities.push_back(buttons[B] - buttons[X]); - // joint->joint_names.push_back("panda_joint6"); - // joint->velocities.push_back(buttons[Y] - buttons[A]); - // return false; - // } - - // The bread and butter: map buttons to twist commands - twist->twist.linear.z = axes[RIGHT_STICK_Y]; - twist->twist.linear.y = axes[RIGHT_STICK_X]; - - double lin_x_right = -0.5 * (axes[RIGHT_TRIGGER] - AXIS_DEFAULTS.at(RIGHT_TRIGGER)); - double lin_x_left = 0.5 * (axes[LEFT_TRIGGER] - AXIS_DEFAULTS.at(LEFT_TRIGGER)); - twist->twist.linear.x = lin_x_right + lin_x_left; - - twist->twist.angular.y = axes[LEFT_STICK_Y]; - twist->twist.angular.x = axes[LEFT_STICK_X]; - - double roll_positive = buttons[RIGHT_BUMPER]; - double roll_negative = -1 * (buttons[LEFT_BUMPER]); - twist->twist.angular.z = roll_positive + roll_negative; - - return true; -} - -/** \brief // This should update the frame_to_publish_ as needed for changing command frame via controller - * @param frame_name Set the command frame to this - * @param buttons The vector of discrete controller button values - */ -void updateCmdFrame(std::string& frame_name, const std::vector& buttons) -{ - if (buttons[CHANGE_VIEW] && frame_name == EEF_FRAME_ID) - frame_name = BASE_FRAME_ID; - else if (buttons[MENU] && frame_name == BASE_FRAME_ID) - frame_name = EEF_FRAME_ID; -} - -namespace servo_arm_twist_pkg -{ -class JoyToServoPub : public rclcpp::Node -{ -public: - JoyToServoPub(const rclcpp::NodeOptions& options) - : Node("joy_to_twist_publisher", options), frame_to_publish_(BASE_FRAME_ID) - { - // Setup pub/sub - joy_sub_ = this->create_subscription( - JOY_TOPIC, rclcpp::SystemDefaultsQoS(), - [this](const sensor_msgs::msg::Joy::ConstSharedPtr& msg) { return joyCB(msg); }); - - twist_pub_ = this->create_publisher(TWIST_TOPIC, rclcpp::SystemDefaultsQoS()); - joint_pub_ = this->create_publisher(JOINT_TOPIC, rclcpp::SystemDefaultsQoS()); - // collision_pub_ = - // this->create_publisher("/planning_scene", rclcpp::SystemDefaultsQoS()); - - // Create a service client to start the ServoNode - servo_start_client_ = this->create_client("/servo_node/start_servo"); - servo_start_client_->wait_for_service(std::chrono::seconds(1)); - servo_start_client_->async_send_request(std::make_shared()); - - // // Load the collision scene asynchronously - // collision_pub_thread_ = std::thread([this]() { - // rclcpp::sleep_for(std::chrono::seconds(3)); - // // Create collision object, in the way of servoing - // moveit_msgs::msg::CollisionObject collision_object; - // collision_object.header.frame_id = "panda_link0"; - // collision_object.id = "box"; - - // shape_msgs::msg::SolidPrimitive table_1; - // table_1.type = table_1.BOX; - // table_1.dimensions = { 0.4, 0.6, 0.03 }; - - // geometry_msgs::msg::Pose table_1_pose; - // table_1_pose.position.x = 0.6; - // table_1_pose.position.y = 0.0; - // table_1_pose.position.z = 0.4; - - // shape_msgs::msg::SolidPrimitive table_2; - // table_2.type = table_2.BOX; - // table_2.dimensions = { 0.6, 0.4, 0.03 }; - - // geometry_msgs::msg::Pose table_2_pose; - // table_2_pose.position.x = 0.0; - // table_2_pose.position.y = 0.5; - // table_2_pose.position.z = 0.25; - - // collision_object.primitives.push_back(table_1); - // collision_object.primitive_poses.push_back(table_1_pose); - // collision_object.primitives.push_back(table_2); - // collision_object.primitive_poses.push_back(table_2_pose); - // collision_object.operation = collision_object.ADD; - - // moveit_msgs::msg::PlanningSceneWorld psw; - // psw.collision_objects.push_back(collision_object); - - // auto ps = std::make_unique(); - // ps->world = psw; - // ps->is_diff = true; - // collision_pub_->publish(std::move(ps)); - // }); - } - - // ~JoyToServoPub() override - // { - // if (collision_pub_thread_.joinable()) - // collision_pub_thread_.join(); - // } - - void joyCB(const sensor_msgs::msg::Joy::ConstSharedPtr& msg) - { - // Create the messages we might publish - auto twist_msg = std::make_unique(); - auto joint_msg = std::make_unique(); - - // This call updates the frame for twist commands - updateCmdFrame(frame_to_publish_, msg->buttons); - - // Convert the joystick message to Twist or JointJog and publish - if (convertJoyToCmd(msg->axes, msg->buttons, twist_msg, joint_msg)) - { - // publish the TwistStamped - twist_msg->header.frame_id = frame_to_publish_; - twist_msg->header.stamp = this->now(); - twist_pub_->publish(std::move(twist_msg)); - } - // else - // { - // // publish the JointJog - // joint_msg->header.stamp = this->now(); - // joint_msg->header.frame_id = "panda_link3"; - // joint_pub_->publish(std::move(joint_msg)); - // } - } - -private: - rclcpp::Subscription::SharedPtr joy_sub_; - rclcpp::Publisher::SharedPtr twist_pub_; - rclcpp::Publisher::SharedPtr joint_pub_; - rclcpp::Publisher::SharedPtr collision_pub_; - rclcpp::Client::SharedPtr servo_start_client_; - - std::string frame_to_publish_; - - // std::thread collision_pub_thread_; -}; // class JoyToServoPub - -} // namespace servo_arm_twist_pkg - -// Register the component with class_loader -#include -RCLCPP_COMPONENTS_REGISTER_NODE(servo_arm_twist_pkg::JoyToServoPub)