From 13419e97c96bc373bc0d3765f005af9f05da93ae Mon Sep 17 00:00:00 2001 From: David Date: Wed, 4 Feb 2026 04:26:50 -0600 Subject: [PATCH] refactor: (arm) cleanup old standalone serial code Removed literally 100 lines --- src/arm_pkg/arm_pkg/arm_node.py | 141 ++++---------------------------- 1 file changed, 18 insertions(+), 123 deletions(-) diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index 10c7fdc..6c3e5fe 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -25,7 +25,6 @@ import math # liveliness_lease_duration=5000 # ) -serial_pub = None thread = None # Used to verify the length of an incoming VicCAN feedback message @@ -44,31 +43,27 @@ viccan_digit_msg_len_dict = { } -class SerialRelay(Node): +class ArmNode(Node): def __init__(self): super().__init__("arm_node") - # Get launch mode -- anchor vs arm - self.declare_parameter("launch_mode", "arm") - self.launch_mode = self.get_parameter("launch_mode").value - self.get_logger().info(f"arm launch_mode is: {self.launch_mode}") + self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output ################################################## # Topics # Anchor topics - if self.launch_mode == "anchor": - self.anchor_fromvic_sub_ = self.create_subscription( - VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20 - ) - self.anchor_tovic_pub_ = self.create_publisher( - VicCAN, "/anchor/to_vic/relay", 20 - ) + self.anchor_fromvic_sub_ = self.create_subscription( + VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20 + ) + self.anchor_tovic_pub_ = self.create_publisher( + VicCAN, "/anchor/to_vic/relay", 20 + ) - self.anchor_sub = self.create_subscription( - String, "/anchor/arm/feedback", self.anchor_feedback, 10 - ) - self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) + self.anchor_sub = self.create_subscription( + String, "/anchor/arm/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) # Control @@ -110,7 +105,6 @@ class SerialRelay(Node): # Old # Create publishers - self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) self.socket_pub = self.create_publisher( SocketFeedback, "/arm/feedback/socket", 10 ) @@ -124,88 +118,16 @@ class SerialRelay(Node): ArmManual, "/arm/control/manual", self.send_manual, 10 ) - ################################################## - # Find microcontroller -- non-anchor only - - # Search for ports IF in 'arm' (standalone) and not 'anchor' mode - if self.launch_mode == "arm": - # Loop through all serial devices on the computer to check for the MCU - self.port = None - ports = SerialRelay.list_serial_ports() - for _ in range(4): - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - # print(f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until("\n") # type: ignore - - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.port = port - self.get_logger().info(f"Found MCU at {self.port}!") - break - except: - pass - if self.port is not None: - break - - if self.port is None: - self.get_logger().info( - "Unable to find MCU... please make sure it is connected." - ) - time.sleep(1) - sys.exit(1) - - self.ser = serial.Serial(self.port, 115200) - atexit.register(self.cleanup) - def run(self): global thread thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread.start() - # if in arm mode, will need to read from the MCU - try: while rclpy.ok(): - if self.launch_mode == "arm": - if self.ser.in_waiting: - self.read_mcu() - else: - time.sleep(0.1) + pass except KeyboardInterrupt: pass - finally: - self.cleanup() - - # Currently will just spit out all values over the /arm/feedback/debug topic as strings - def read_mcu(self): - try: - output = str(self.ser.readline(), "utf8") - if output: - # self.get_logger().info(f"[MCU] {output}") - msg = String() - msg.data = output - self.debug_pub.publish(msg) - except serial.SerialException: - self.get_logger().info("SerialException caught... closing serial port.") - if self.ser.is_open: - self.ser.close() - pass - except TypeError as e: - self.get_logger().info(f"TypeError: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - pass - except Exception as e: - print(f"Exception: {e}") - print("Closing serial port.") - if self.ser.is_open: - self.ser.close() - pass def joint_command_callback(self, msg: JointState): if len(msg.position) < 7 and len(msg.velocity) < 7: @@ -256,15 +178,8 @@ class SerialRelay(Node): return def send_cmd(self, msg: str): - if ( - self.launch_mode == "anchor" - ): # if in anchor mode, send to anchor node to relay - output = String() - output.data = msg - self.anchor_pub.publish(output) - elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly - self.get_logger().info(f"[Arm to MCU] {msg}") - self.ser.write(bytes(msg, "utf8")) + output = String(data=msg) + self.anchor_pub.publish(output) def anchor_feedback(self, msg: String): output = msg.data @@ -473,33 +388,13 @@ class SerialRelay(Node): self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_current = current - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - # return glob.glob("/dev/tty[A-Za-z]*") - - def cleanup(self): - print("Cleaning up...") - try: - if self.ser.is_open: - self.ser.close() - except Exception as e: - exit(0) - - -def myexcepthook(type, value, tb): - print("Uncaught exception:", type, value) - if serial_pub: - serial_pub.cleanup() - def main(args=None): rclpy.init(args=args) - sys.excepthook = myexcepthook - global serial_pub - serial_pub = SerialRelay() - serial_pub.run() + arm_node = ArmNode() + arm_node.run() + rclpy.try_shutdown() if __name__ == "__main__":