diff --git a/.envrc b/.envrc index 3550a30..1306e13 100644 --- a/.envrc +++ b/.envrc @@ -1 +1,2 @@ use flake +[[ -d install ]] && source install/setup.$(echo $0 | grep -oE '[^/]+$') diff --git a/flake.nix b/flake.nix index f4d5bac..555e9db 100644 --- a/flake.nix +++ b/flake.nix @@ -28,6 +28,7 @@ (python313.withPackages ( p: with p; [ pyserial + python-can pygame scipy crccheck diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py index da3577c..bdccf44 100644 --- a/src/anchor_pkg/anchor_pkg/connector.py +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -1,10 +1,16 @@ from abc import ABC, abstractmethod -import serial -import serial.tools.list_ports from astra_msgs.msg import VicCAN from rclpy.impl.rcutils_logger import RcutilsLogger from .convert import string_to_viccan +# CAN +import can +import can.interfaces.socketcan + +# Serial +import serial +import serial.tools.list_ports + KNOWN_USBS = [ (0x2E8A, 0x00C0), # Raspberry Pi Pico (0x1A86, 0x55D4), # Adafruit Feather ESP32 V2 @@ -33,6 +39,9 @@ class DeviceClosedException(Exception): class Connector(ABC): logger: RcutilsLogger + def __init__(self, logger: RcutilsLogger): + self.logger = logger + @abstractmethod def read(self) -> VicCAN | None: pass @@ -51,74 +60,8 @@ class SerialConnector(Connector): serial_interface: serial.Serial override: bool - def _get_name(self, port: str) -> str | None: - """ - Get the name of the MCU (if it works) - - returns: str name of the MCU, None if it doesn't work - """ - # attempt to open the serial port - serial_interface: serial.Serial - try: - self.logger.info(f"asking {port} for its name") - serial_interface = serial.Serial(port, BAUD_RATE, timeout=1) - - serial_interface.write( - b"can_relay_mode,on\n" - ) - - for i in range(4): - self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name") - response = serial_interface.read_until(bytes("\n", "utf8")) - try: - if b"can_relay_ready" in response: - args: list[str] = response.decode("utf8").strip().split(",") - if len(args) == 2: - self.logger.info(f"we are talking to {args[1]}") - return args[1] - break - except UnicodeDecodeError as e: - self.logger.info( - f"ignoring UnicodeDecodeError when asking for MCU name: {e}" - ) - - if serial_interface.is_open: - serial_interface.close() - except serial.SerialException as e: - self.logger.error(f"SerialException when asking for MCU name: {e}") - - return None - - def _find_ports(self) -> list[str]: - """ - Finds all valid ports but does not test them - - returns: all valid ports - """ - comports = serial.tools.list_ports.comports() - valid_ports = list( - map( # get just device strings - lambda p: p.device, - filter( # make sure we have a known device - lambda p: (p.vid, p.pid) in KNOWN_USBS - and p.device is not None, - comports, - ), - ) - ) - self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]") - return valid_ports - - def cleanup(self): - self.logger.info(f"closing serial port if open {self.port}") - try: - if self.serial_interface.is_open: - self.serial_interface.close() - except Exception as e: - self.logger.error(e) - def __init__(self, logger: RcutilsLogger): - self.logger = logger + super().__init__(logger) ports = self._find_ports() @@ -143,6 +86,61 @@ class SerialConnector(Connector): # if we fail at this point, it should crash because we've already tested the port self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1) + def _find_ports(self) -> list[str]: + """ + Finds all valid ports but does not test them + + returns: all valid ports + """ + comports = serial.tools.list_ports.comports() + valid_ports = list( + map( # get just device strings + lambda p: p.device, + filter( # make sure we have a known device + lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None, + comports, + ), + ) + ) + self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]") + return valid_ports + + def _get_name(self, port: str) -> str | None: + """ + Get the name of the MCU (if it works) + + returns: str name of the MCU, None if it doesn't work + """ + # attempt to open the serial port + serial_interface: serial.Serial + try: + self.logger.info(f"asking {port} for its name") + serial_interface = serial.Serial(port, BAUD_RATE, timeout=1) + + serial_interface.write(b"can_relay_mode,on\n") + + for i in range(4): + self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name") + response = serial_interface.read_until(bytes("\n", "utf8")) + try: + if b"can_relay_ready" in response: + args: list[str] = response.decode("utf8").strip().split(",") + if len(args) == 2: + self.logger.info(f"we are talking to {args[1]}") + return args[1] + break + except UnicodeDecodeError as e: + self.logger.info( + f"ignoring UnicodeDecodeError when asking for MCU name: {e}" + ) + + if serial_interface.is_open: + serial_interface.close() + except serial.SerialException as e: + self.logger.error(f"SerialException when asking for MCU name: {e}") + + return None + def read(self) -> VicCAN | None: try: raw = str(self.serial_interface.readline(), "utf8") @@ -166,15 +164,23 @@ class SerialConnector(Connector): output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n" self.serial_interface.write(bytes(output, "utf8")) + def cleanup(self): + self.logger.info(f"closing serial port if open {self.port}") + try: + if self.serial_interface.is_open: + self.serial_interface.close() + except Exception as e: + self.logger.error(e) + class CANConnector(Connector): def __init__(self, logger: RcutilsLogger): - pass + super().__init__(logger) class MockConnector(Connector): - def __init__(self, _: RcutilsLogger): - pass + def __init__(self, logger: RcutilsLogger): + super().__init__(logger) def read(self) -> VicCAN | None: return None diff --git a/src/anchor_pkg/anchor_pkg/convert.py b/src/anchor_pkg/anchor_pkg/convert.py index d122226..83113b0 100644 --- a/src/anchor_pkg/anchor_pkg/convert.py +++ b/src/anchor_pkg/anchor_pkg/convert.py @@ -1,7 +1,25 @@ from astra_msgs.msg import VicCAN from rclpy.impl.rcutils_logger import RcutilsLogger -def string_to_viccan(msg: str, mcu_name, logger: RcutilsLogger): +def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger): + """ + Converts the serial string VicCAN format to a ROS2 VicCAN message. + Does not fill out the Header of the message. + On a failure, it will log at a debug level why it failed. + + Parameters: + * msg: str + - The message in serial VicCAN format + * mcu_name: str + - The name of the MCU (e.g. core, citadel, arm) + * logger: RcutilsLogger + - A logger retrieved from node.get_logger() + + Returns: + * VicCAN | None + - The VicCAN message on a success or None on a failure + """ + parts: list[str] = msg.split(",") # don't need an extra check because len of .split output is always >= 1 diff --git a/src/anchor_pkg/package.xml b/src/anchor_pkg/package.xml index 4903787..5bb527f 100644 --- a/src/anchor_pkg/package.xml +++ b/src/anchor_pkg/package.xml @@ -3,13 +3,14 @@ anchor_pkg 0.0.0 - TODO: Package description - tristan + Anchor -- ROS and CAN relay node + Riley AGPL-3.0-only rclpy common_interfaces python3-serial + python3-can black