implement review comments

This commit is contained in:
ryleu
2026-03-21 18:14:44 -05:00
parent 7a3c4af1ce
commit 941e196316
3 changed files with 220 additions and 115 deletions

View File

@@ -14,12 +14,11 @@ from .connector import (
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import sys
from .convert import string_to_viccan, viccan_to_string
import threading
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
from std_msgs.msg import String
class Anchor(Node):
@@ -33,12 +32,16 @@ class Anchor(Node):
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
* /anchor/to_vic/debug
- A string copy of the messages published to ./relay are published here
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Send raw strings to connectors. Does not work for connectors that require conversion (like CANConnector)
"""
connector: Connector
@@ -59,31 +62,96 @@ class Anchor(Node):
),
)
self.declare_parameter(
"can_override",
"",
ParameterDescriptor(
name="can_override",
description="Overrides which CAN channel will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid CAN network that shows up in `ip link show`.",
),
)
# ROS2 Topic Setup
# Publishers
self.fromvic_debug_pub_ = self.create_publisher( # only used by serial
String,
"/anchor/from_vic/debug",
20,
)
self.fromvic_core_pub_ = self.create_publisher(
VicCAN,
"/anchor/from_vic/core",
20,
)
self.fromvic_arm_pub_ = self.create_publisher(
VicCAN,
"/anchor/from_vic/arm",
20,
)
self.fromvic_bio_pub_ = self.create_publisher(
VicCAN,
"/anchor/from_vic/bio",
20,
)
# Debug publisher
self.tovic_debug_pub_ = self.create_publisher(
VicCAN,
"/anchor/to_vic/debug",
20,
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN,
"/anchor/to_vic/relay",
self.write_connector,
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String,
"/anchor/from_vic/mock_mcu",
self.on_mock_fromvic,
20,
)
self.tovic_string_sub_ = self.create_subscription(
String,
"/anchor/to_vic/relay_string",
self.connector.write_raw,
20,
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
can_override = (
self.get_parameter("can_override").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(self.get_logger())
self.connector = SerialConnector(logger, self.get_clock())
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(self.get_logger())
self.connector = CANConnector(logger, self.get_clock(), can_override)
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(self.get_logger())
self.connector = MockConnector(logger, self.get_clock())
case "auto":
logger.info("automatically determining connector")
try:
logger.info("trying CAN connector")
self.connector = CANConnector(self.get_logger())
self.connector = CANConnector(
logger, self.get_clock(), can_override
)
except (NoValidDeviceException, NoWorkingDeviceException, TypeError):
logger.info("CAN connector failed, trying serial connector")
self.connector = SerialConnector(self.get_logger())
self.connector = SerialConnector(logger, self.get_clock())
case _:
self.get_logger().fatal(
logger.fatal(
f"invalid value for connector parameter: {connector_select}"
)
exit(1)
@@ -91,46 +159,26 @@ class Anchor(Node):
# Close devices on exit
atexit.register(self.cleanup)
# ROS2 Topic Setup
# Publishers
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
)
self.fromvic_core_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/core", 20
)
self.fromvic_arm_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/arm", 20
)
self.fromvic_bio_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/bio", 20
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.connector.write, 20
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
def cleanup(self):
self.connector.cleanup()
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output = self.connector.read()
def read_connector(self):
"""Check the connector for new data from the MCU, and publish string to appropriate topics"""
viccan, raw = self.connector.read()
if not output:
return
if raw:
self.fromvic_debug_pub_.publish(String(data=raw))
self.relay_fromvic(output)
if viccan:
self.relay_fromvic(viccan)
def write_connector(self, msg: VicCAN):
"""Write to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write(msg)
self.tovic_debug_pub_.publish(viccan_to_string(msg))
def relay_fromvic(self, msg: VicCAN):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
msg.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic")
"""Relay a message from the MCU to the appropriate VicCAN topic"""
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
@@ -139,10 +187,12 @@ class Anchor(Node):
self.fromvic_bio_pub_.publish(msg)
def on_mock_fromvic(self, msg: String):
"""Relay a message as if it came from the MCU"""
viccan = string_to_viccan(
msg.data,
"mock",
self.get_logger(),
self.get_clock().now().to_msg(),
)
if viccan:
self.relay_fromvic(viccan)
@@ -158,16 +208,9 @@ def main(args=None):
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
anchor_node.read_connector() # Check the connector for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.clock import Clock
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
from .convert import string_to_viccan as _string_to_viccan, viccan_to_string
# CAN
import can
@@ -39,18 +40,36 @@ class DeviceClosedException(Exception):
class Connector(ABC):
logger: RcutilsLogger
clock: Clock
def __init__(self, logger: RcutilsLogger):
def string_to_viccan(self, msg: str, mcu_name: str):
"""function currying so that we do not need to pass logger and clock every time"""
return _string_to_viccan(
msg,
mcu_name,
self.logger,
self.clock.now().to_msg(),
)
def __init__(self, logger: RcutilsLogger, clock: Clock):
self.logger = logger
self.clock = clock
@abstractmethod
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
"""
Must return a tuple of (VicCAN, debug message or string repr of VicCAN)
"""
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
@abstractmethod
def write_raw(self, msg: str):
pass
def cleanup(self):
pass
@@ -61,8 +80,8 @@ class SerialConnector(Connector):
serial_interface: serial.Serial
override: bool
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock):
super().__init__(logger, clock)
ports = self._find_ports()
@@ -136,34 +155,36 @@ class SerialConnector(Connector):
)
if serial_interface.is_open:
# turn relay mode off if it failed to respond with its name
serial_interface.write(b"can_relay_mode,off\n")
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return None
return (None, None)
return string_to_viccan(raw, self.mcu_name, self.logger)
return (
self.string_to_viccan(raw, self.mcu_name),
raw,
)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except TypeError as e:
self.logger.error(f"TypeError: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
pass # pretty much no other error matters
return (None, None) # pretty much no other error matters
def write(self, msg: VicCAN):
# go from [ w, x, y, z ] -> "w,x,y,z" & round to 7 digits max
data = ",".join([str(round(x, 7)) for x in msg.data])
output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n"
self.serial_interface.write(bytes(output, "utf8"))
self.write_raw(viccan_to_string(msg))
def write_raw(self, msg: str):
self.serial_interface.write(bytes(msg, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
@@ -175,26 +196,35 @@ class SerialConnector(Connector):
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock, can_override: str):
super().__init__(logger, clock)
self.can_channel: str | None = None
self.can_bus: can.BusABC | None = None
if self.can_channel is None:
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
# filter to busses whose channel matches the can_override
if can_override:
avail = list(
filter(
lambda b: b.get("channel") == can_override,
avail,
)
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
try:
self.can_bus = can.Bus(
@@ -208,9 +238,9 @@ class CANConnector(Connector):
)
if self.can_channel and self.can_channel.startswith("v"):
self.logger.warn("using virtual CAN interface; this is likely vcan*")
self.logger.warn("likely using virtual CAN interface")
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
@@ -221,7 +251,7 @@ class CANConnector(Connector):
raise DeviceClosedException("CAN bus closed unexpectedly")
if message is None:
return None
return (None, None)
arbitration_id = message.arbitration_id & 0x7FF
data_bytes = bytes(message.data)
@@ -244,7 +274,7 @@ class CANConnector(Connector):
self.logger.warn(
f"received CAN frame with unknown MCU key {mcu_key}; id=0x{arbitration_id:X}"
)
return None
return (None, None)
data: list[float] = []
@@ -256,7 +286,7 @@ class CANConnector(Connector):
self.logger.warn(
f"received double payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
return (None, None)
(value,) = struct.unpack("<d", data_bytes[:8])
data = [float(value)]
elif data_type_key == 1:
@@ -264,7 +294,7 @@ class CANConnector(Connector):
self.logger.warn(
f"received float32x2 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
return (None, None)
v1, v2 = struct.unpack("<ff", data_bytes[:8])
data = [float(v1), float(v2)]
elif data_type_key == 2:
@@ -272,17 +302,17 @@ class CANConnector(Connector):
self.logger.warn(
f"received int16x4 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
return (None, None)
i1, i2, i3, i4 = struct.unpack("<hhhh", data_bytes[:8])
data = [float(i1), float(i2), float(i3), float(i4)]
else:
self.logger.warn(
f"received CAN frame with unknown data_type_key {data_type_key}; id=0x{arbitration_id:X}"
)
return None
return (None, None)
except struct.error as e:
self.logger.error(f"error unpacking CAN payload: {e}")
return None
return (None, None)
viccan = VicCAN(
mcu_name=mcu_name,
@@ -295,7 +325,11 @@ class CANConnector(Connector):
f"decoded as VicCAN(mcu_name={viccan.mcu_name}, command_id={viccan.command_id}, data={viccan.data})"
)
return viccan
return (
viccan,
f"{viccan.mcu_name},{viccan.command_id},"
+ ",".join(map(str, list(viccan.data))),
)
def write(self, msg: VicCAN):
if not self.can_bus:
@@ -403,6 +437,9 @@ class CANConnector(Connector):
self.logger.error(f"CAN error while sending: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
def write_raw(self, msg: str):
self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}")
def cleanup(self):
try:
if self.can_bus is not None:
@@ -413,11 +450,15 @@ class CANConnector(Connector):
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock):
super().__init__(logger, clock)
# No hardware interface for MockConnector. Publish to `/anchor/from_vic/mock_mcu` instead.
def read(self) -> VicCAN | None:
return None
def read(self) -> tuple[VicCAN | None, str | None]:
return (None, None)
def write(self, msg: VicCAN):
print(msg)
pass
def write_raw(self, msg: str):
pass

View File

@@ -1,27 +1,19 @@
from astra_msgs.msg import VicCAN
from std_msgs.msg import Header
from builtin_interfaces.msg import Time
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
def string_to_viccan(
msg: str, mcu_name: str, logger: RcutilsLogger, time: Time
) -> VicCAN | None:
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed.
Parameters:
* msg: str
- The message in serial VicCAN format
* mcu_name: str
- The name of the MCU (e.g. core, citadel, arm)
* logger: RcutilsLogger
- A logger retrieved from node.get_logger()
Returns:
* VicCAN | None
- The VicCAN message on a success or None on a failure
On a failure, it will log at a debug level why it failed and return None.
"""
parts: list[str] = msg.split(",")
parts: list[str] = msg.strip().split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
@@ -34,8 +26,37 @@ def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
return VicCAN(
mcu_name=parts[1],
command_id=int(parts[2]),
data=[float(x) for x in parts[3:]],
)
try:
command_id = int(parts[2])
except ValueError:
logger.debug(
f"got garbage (non-integer command id) CAN data from {mcu_name}: {msg}"
)
return None
if command_id not in range(64):
logger.debug(
f"got garbage (wrong command id {command_id}) CAN data from {mcu_name}: {msg}"
)
return None
try:
return VicCAN(
header=Header(
stamp=time,
frame_id="from_vic",
),
mcu_name=parts[1],
command_id=command_id,
data=[float(x) for x in parts[3:]],
)
except ValueError:
logger.debug(f"got garbage (non-numerical) CAN data from {mcu_name}: {msg}")
return None
def viccan_to_string(viccan: VicCAN) -> str:
"""Converts a ROS2 VicCAN message to the serial string VicCAN format."""
# go from [ w, x, y, z ] -> ",w,x,y,z" & round to 7 digits max
data = "".join([f",{round(val,7)}" for val in viccan.data])
return f"can_relay_tovic,{viccan.mcu_name}{data}\n"