clean stuff up a bit to prep for CAN

This commit is contained in:
ryleu
2026-02-15 17:23:18 -06:00
parent 5c0194c543
commit b388275bba
5 changed files with 102 additions and 75 deletions

1
.envrc
View File

@@ -1 +1,2 @@
use flake
[[ -d install ]] && source install/setup.$(echo $0 | grep -oE '[^/]+$')

View File

@@ -28,6 +28,7 @@
(python313.withPackages (
p: with p; [
pyserial
python-can
pygame
scipy
crccheck

View File

@@ -1,10 +1,16 @@
from abc import ABC, abstractmethod
import serial
import serial.tools.list_ports
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
# CAN
import can
import can.interfaces.socketcan
# Serial
import serial
import serial.tools.list_ports
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
@@ -33,6 +39,9 @@ class DeviceClosedException(Exception):
class Connector(ABC):
logger: RcutilsLogger
def __init__(self, logger: RcutilsLogger):
self.logger = logger
@abstractmethod
def read(self) -> VicCAN | None:
pass
@@ -51,74 +60,8 @@ class SerialConnector(Connector):
serial_interface: serial.Serial
override: bool
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(
b"can_relay_mode,on\n"
)
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS
and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
def __init__(self, logger: RcutilsLogger):
self.logger = logger
super().__init__(logger)
ports = self._find_ports()
@@ -143,6 +86,61 @@ class SerialConnector(Connector):
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(b"can_relay_mode,on\n")
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> VicCAN | None:
try:
raw = str(self.serial_interface.readline(), "utf8")
@@ -166,15 +164,23 @@ class SerialConnector(Connector):
output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n"
self.serial_interface.write(bytes(output, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
pass
super().__init__(logger)
class MockConnector(Connector):
def __init__(self, _: RcutilsLogger):
pass
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def read(self) -> VicCAN | None:
return None

View File

@@ -1,7 +1,25 @@
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name, logger: RcutilsLogger):
def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed.
Parameters:
* msg: str
- The message in serial VicCAN format
* mcu_name: str
- The name of the MCU (e.g. core, citadel, arm)
* logger: RcutilsLogger
- A logger retrieved from node.get_logger()
Returns:
* VicCAN | None
- The VicCAN message on a success or None on a failure
"""
parts: list[str] = msg.split(",")
# don't need an extra check because len of .split output is always >= 1

View File

@@ -3,13 +3,14 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<description>Anchor -- ROS and CAN relay node</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend>