add a mock mode and fix a logic error

This commit is contained in:
ryleu
2026-02-14 23:16:34 -06:00
parent 18fce2c19b
commit 4459886fc1
3 changed files with 108 additions and 42 deletions

View File

@@ -1,11 +1,20 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal
import atexit
from connector import Connector, SerialConnector, CANConnector
from .connector import (
Connector,
MockConnector,
SerialConnector,
CANConnector,
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import sys
import threading
@@ -37,25 +46,61 @@ class Anchor(Node):
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
connector: Connector
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
super().__init__("anchor_node")
logger = self.get_logger()
self.declare_parameter(
"connector",
"auto",
ParameterDescriptor(
name="connector",
description="Declares which MCU connector should be used. Defaults to 'auto'.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.",
),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(self.get_logger())
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(self.get_logger())
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(self.get_logger())
case "auto":
logger.info("automatically determining connector")
try:
logger.info("trying CAN connector")
self.connector = CANConnector(self.get_logger())
except (NoValidDeviceException, NoWorkingDeviceException):
logger.info("CAN connector failed, trying serial connector")
self.connector = SerialConnector(self.get_logger())
case _:
self.get_logger().fatal(
f"invalid value for connector parameter: {connector_select}"
)
exit(1)
# Close serial port on exit
# Close devices on exit
atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup
# Pub/sub with VicCAN
# Publishers
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
)
@@ -69,9 +114,13 @@ class Anchor(Node):
VicCAN, "/anchor/from_vic/bio", 20
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.connector.write, 20
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
def cleanup(self):
self.connector.cleanup()
@@ -96,6 +145,15 @@ class Anchor(Node):
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
def on_mock_fromvic(self, msg: String):
viccan = string_to_viccan(
msg.data,
"mock",
self.get_logger(),
)
if viccan:
self.relay_fromvic(viccan)
def main(args=None):
try:

View File

@@ -3,6 +3,7 @@ import serial
import serial.tools.list_ports
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
@@ -111,15 +112,9 @@ class SerialConnector(Connector):
except Exception as e:
self.logger.error(e)
def __init__(self, logger: RcutilsLogger, port_override: str | None = None):
def __init__(self, logger: RcutilsLogger):
self.logger = logger
self.override = bool(port_override)
ports: list[str]
if port_override:
ports = [port_override]
else:
ports = self._find_ports()
if len(ports) <= 0:
@@ -150,35 +145,12 @@ class SerialConnector(Connector):
if not raw:
return None
parts = raw.split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
self.logger.debug(f"got non-CAN data from {self.mcu_name}: {raw}")
return None
elif len(parts) < 3:
self.logger.debug(
f"got garbage (not enough parts) CAN data from {self.mcu_name}: {raw}"
)
return None
elif len(parts) > 7:
self.logger.debug(
f"got garbage (too many parts) CAN data from {self.mcu_name}: {raw}"
)
return None
return VicCAN(
mcu_name=parts[1],
command_id=parts[2],
data=[float(x) for x in parts[3:]],
)
return string_to_viccan(raw, self.mcu_name, self.logger)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
self._close_port()
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except TypeError as e:
self.logger.error(f"TypeError: {e}")
self._close_port()
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
pass # pretty much no other error matters
@@ -191,4 +163,16 @@ class SerialConnector(Connector):
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
pass
class MockConnector(Connector):
def __init__(self, _: RcutilsLogger):
pass
def read(self) -> VicCAN | None:
return None
def write(self, msg: VicCAN):
print(msg)

View File

@@ -0,0 +1,24 @@
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name, logger: RcutilsLogger):
parts: list[str] = msg.split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
logger.info(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
logger.info(
f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}"
)
return None
elif len(parts) > 7:
logger.info(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
return VicCAN(
mcu_name=parts[1],
command_id=int(parts[2]),
data=[float(x) for x in parts[3:]],
)