From 4459886fc108f3eb5d0c2c6795dd40af480d1cd9 Mon Sep 17 00:00:00 2001 From: ryleu <69326171+ryleu@users.noreply.github.com> Date: Sat, 14 Feb 2026 23:16:34 -0600 Subject: [PATCH] add a mock mode and fix a logic error --- src/anchor_pkg/anchor_pkg/anchor_node.py | 76 +++++++++++++++++++++--- src/anchor_pkg/anchor_pkg/connector.py | 50 ++++++---------- src/anchor_pkg/anchor_pkg/convert.py | 24 ++++++++ 3 files changed, 108 insertions(+), 42 deletions(-) create mode 100644 src/anchor_pkg/anchor_pkg/convert.py diff --git a/src/anchor_pkg/anchor_pkg/anchor_node.py b/src/anchor_pkg/anchor_pkg/anchor_node.py index 59198f6..d29bca1 100644 --- a/src/anchor_pkg/anchor_pkg/anchor_node.py +++ b/src/anchor_pkg/anchor_pkg/anchor_node.py @@ -1,11 +1,20 @@ import rclpy from rclpy.node import Node from rclpy.executors import ExternalShutdownException +from rcl_interfaces.msg import ParameterDescriptor, ParameterType import signal import atexit -from connector import Connector, SerialConnector, CANConnector +from .connector import ( + Connector, + MockConnector, + SerialConnector, + CANConnector, + NoValidDeviceException, + NoWorkingDeviceException, +) +from .convert import string_to_viccan import sys import threading @@ -37,25 +46,61 @@ class Anchor(Node): - For testing without an actual MCU, publish strings here as if they came from an MCU * /anchor/to_vic/relay - Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU - * /anchor/to_vic/relay_string - - Publish raw strings to this topic to send directly to the MCU for debugging """ connector: Connector def __init__(self): - # Initalize node with name - super().__init__("anchor_node") # previously 'serial_publisher' + super().__init__("anchor_node") - self.connector = SerialConnector(self.get_logger()) + logger = self.get_logger() - # Close serial port on exit + self.declare_parameter( + "connector", + "auto", + ParameterDescriptor( + name="connector", + description="Declares which MCU connector should be used. Defaults to 'auto'.", + type=ParameterType.PARAMETER_STRING, + additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.", + ), + ) + + # Determine which connector to use. Options are Mock, Serial, and CAN + connector_select = ( + self.get_parameter("connector").get_parameter_value().string_value + ) + + match connector_select: + case "serial": + logger.info("using serial connector") + self.connector = SerialConnector(self.get_logger()) + case "can": + logger.info("using CAN connector") + self.connector = CANConnector(self.get_logger()) + case "mock": + logger.info("using mock connector") + self.connector = MockConnector(self.get_logger()) + case "auto": + logger.info("automatically determining connector") + try: + logger.info("trying CAN connector") + self.connector = CANConnector(self.get_logger()) + except (NoValidDeviceException, NoWorkingDeviceException): + logger.info("CAN connector failed, trying serial connector") + self.connector = SerialConnector(self.get_logger()) + case _: + self.get_logger().fatal( + f"invalid value for connector parameter: {connector_select}" + ) + exit(1) + + # Close devices on exit atexit.register(self.cleanup) - ################################################## # ROS2 Topic Setup - # Pub/sub with VicCAN + # Publishers self.fromvic_debug_pub_ = self.create_publisher( String, "/anchor/from_vic/debug", 20 ) @@ -69,9 +114,13 @@ class Anchor(Node): VicCAN, "/anchor/from_vic/bio", 20 ) + # Subscribers self.tovic_sub_ = self.create_subscription( VicCAN, "/anchor/to_vic/relay", self.connector.write, 20 ) + self.mock_mcu_sub_ = self.create_subscription( + String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20 + ) def cleanup(self): self.connector.cleanup() @@ -96,6 +145,15 @@ class Anchor(Node): elif msg.mcu_name == "citadel" or msg.mcu_name == "digit": self.fromvic_bio_pub_.publish(msg) + def on_mock_fromvic(self, msg: String): + viccan = string_to_viccan( + msg.data, + "mock", + self.get_logger(), + ) + if viccan: + self.relay_fromvic(viccan) + def main(args=None): try: diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py index a19f30e..c70e0c5 100644 --- a/src/anchor_pkg/anchor_pkg/connector.py +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -3,6 +3,7 @@ import serial import serial.tools.list_ports from astra_msgs.msg import VicCAN from rclpy.impl.rcutils_logger import RcutilsLogger +from .convert import string_to_viccan KNOWN_USBS = [ (0x2E8A, 0x00C0), # Raspberry Pi Pico @@ -111,16 +112,10 @@ class SerialConnector(Connector): except Exception as e: self.logger.error(e) - def __init__(self, logger: RcutilsLogger, port_override: str | None = None): + def __init__(self, logger: RcutilsLogger): self.logger = logger - self.override = bool(port_override) - ports: list[str] - - if port_override: - ports = [port_override] - else: - ports = self._find_ports() + ports = self._find_ports() if len(ports) <= 0: raise NoValidDeviceException("no valid serial device found") @@ -150,35 +145,12 @@ class SerialConnector(Connector): if not raw: return None - parts = raw.split(",") - - # don't need an extra check because len of .split output is always >= 1 - if parts[0] != "can_relay_fromvic": - self.logger.debug(f"got non-CAN data from {self.mcu_name}: {raw}") - return None - elif len(parts) < 3: - self.logger.debug( - f"got garbage (not enough parts) CAN data from {self.mcu_name}: {raw}" - ) - return None - elif len(parts) > 7: - self.logger.debug( - f"got garbage (too many parts) CAN data from {self.mcu_name}: {raw}" - ) - return None - - return VicCAN( - mcu_name=parts[1], - command_id=parts[2], - data=[float(x) for x in parts[3:]], - ) + return string_to_viccan(raw, self.mcu_name, self.logger) except serial.SerialException as e: self.logger.error(f"SerialException: {e}") - self._close_port() raise DeviceClosedException(f"serial port {self.port} closed unexpectedly") except TypeError as e: self.logger.error(f"TypeError: {e}") - self._close_port() raise DeviceClosedException(f"serial port {self.port} closed unexpectedly") except Exception: pass # pretty much no other error matters @@ -191,4 +163,16 @@ class SerialConnector(Connector): class CANConnector(Connector): - pass + def __init__(self, logger: RcutilsLogger): + pass + + +class MockConnector(Connector): + def __init__(self, _: RcutilsLogger): + pass + + def read(self) -> VicCAN | None: + return None + + def write(self, msg: VicCAN): + print(msg) diff --git a/src/anchor_pkg/anchor_pkg/convert.py b/src/anchor_pkg/anchor_pkg/convert.py new file mode 100644 index 0000000..cd13178 --- /dev/null +++ b/src/anchor_pkg/anchor_pkg/convert.py @@ -0,0 +1,24 @@ +from astra_msgs.msg import VicCAN +from rclpy.impl.rcutils_logger import RcutilsLogger + +def string_to_viccan(msg: str, mcu_name, logger: RcutilsLogger): + parts: list[str] = msg.split(",") + + # don't need an extra check because len of .split output is always >= 1 + if parts[0] != "can_relay_fromvic": + logger.info(f"got non-CAN data from {mcu_name}: {msg}") + return None + elif len(parts) < 3: + logger.info( + f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}" + ) + return None + elif len(parts) > 7: + logger.info(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}") + return None + + return VicCAN( + mcu_name=parts[1], + command_id=int(parts[2]), + data=[float(x) for x in parts[3:]], + )