diff --git a/README.md b/README.md index 19d15ed..a8fba7e 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,33 @@ $ ros2 launch anchor_pkg rover.launch.py # Must be run on a computer connected $ ros2 run headless_pkg headless_full # Optionally run in a separate shell on the same or different computer. ``` +### Using the Mock Connector + +Anchor provides a mock connector meant for testing and scripting purposes. You can select the mock connector by running anchor with this command: + +```bash +$ ros2 launch anchor_pkg rover.launch.py connector:="mock" +``` + +To see all data that would be sent over the CAN network (and thus to the microcontrollers), use this command: + +```bash +$ ros2 topic echo /anchor/to_vic/debug +``` + +To send data to the mock connector (as if you were a ROS2 node), use the normal relay topic: + +```bash +$ ros2 topic pub /anchor/to_vic/relay astra_msgs/msg/VicCAN '{mcu_name: "core", command_id: 50, data: [0.0, 2.0, 0.0, 1.0]}' + +``` + +To send data to the mock connector (as if you were a microcontroller), publish to the dedicated topic: + +```bash +$ ros2 topic pub /anchor/from_vic/mock_mcu astra_msgs/msg/VicCAN '{mcu_name: "arm", command_id: 55, data: [0.0, 450.0, 900.0, 0.0]}' +``` + ### Testing Serial You can fake the presence of a Serial device (i.e., MCU) by using the following command: @@ -68,10 +95,31 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following $ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT ``` -When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so: +When you go to run anchor, use the `serial_override` ROS2 parameter to point it to the fake serial port, like so: ```bash -$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py +$ ros2 launch anchor_pkg rover.launch.py connector:=serial serial_override:=/tmp/ttyACM9 +``` + +### Testing CAN + +You can create a virtual CAN network by using the following commands to create and then enable it: + +```bash +sudo ip link add dev vcan0 type vcan +sudo ip link set vcan0 up +``` + +When you go to run anchor, use the `can_override` ROS2 parameter to point it to the virtual CAN network, like so: + +```bash +$ ros2 launch anchor_pkg rover.launch.py connector:=can can_override:=vcan0 +``` + +Once you're done, you should delete the virtual network so that anchor doesn't get confused if you plug in a real CAN adapter: + +```bash +$ sudo ip link delete vcan0 ``` ### Connecting the GuliKit Controller diff --git a/flake.lock b/flake.lock index 4c9cb33..5fd93a5 100644 --- a/flake.lock +++ b/flake.lock @@ -60,7 +60,8 @@ "nixpkgs": [ "nix-ros-overlay", "nixpkgs" - ] + ], + "treefmt-nix": "treefmt-nix" } }, "systems": { @@ -77,6 +78,26 @@ "repo": "default", "type": "github" } + }, + "treefmt-nix": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1773297127, + "narHash": "sha256-6E/yhXP7Oy/NbXtf1ktzmU8SdVqJQ09HC/48ebEGBpk=", + "owner": "numtide", + "repo": "treefmt-nix", + "rev": "71b125cd05fbfd78cab3e070b73544abe24c5016", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "treefmt-nix", + "type": "github" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index f4d5bac..baf6853 100644 --- a/flake.nix +++ b/flake.nix @@ -4,6 +4,11 @@ inputs = { nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! + + treefmt-nix = { + url = "github:numtide/treefmt-nix"; + inputs.nixpkgs.follows = "nixpkgs"; + }; }; outputs = @@ -11,7 +16,8 @@ self, nix-ros-overlay, nixpkgs, - }: + ... + }@inputs: nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem ( system: let @@ -28,6 +34,7 @@ (python313.withPackages ( p: with p; [ pyserial + python-can pygame scipy crccheck @@ -61,7 +68,7 @@ moveit-msgs moveit-ros-planning moveit-ros-planning-interface - moveit-ros-visualization + moveit-ros-visualization moveit-configs-utils moveit-ros-move-group moveit-servo @@ -72,7 +79,7 @@ ompl joy ros2-controllers - chomp-motion-planner + chomp-motion-planner ]; } ) @@ -83,6 +90,8 @@ export QT_X11_NO_MITSHM=1 ''; }; + + formatter = (inputs.treefmt-nix.lib.evalModule pkgs ./treefmt.nix).config.build.wrapper; } ); diff --git a/src/anchor_pkg/anchor_pkg/anchor_node.py b/src/anchor_pkg/anchor_pkg/anchor_node.py index a2374fa..7c475a6 100644 --- a/src/anchor_pkg/anchor_pkg/anchor_node.py +++ b/src/anchor_pkg/anchor_pkg/anchor_node.py @@ -1,28 +1,24 @@ +from warnings import deprecated import rclpy from rclpy.node import Node from rclpy.executors import ExternalShutdownException -from std_srvs.srv import Empty +from rcl_interfaces.msg import ParameterDescriptor, ParameterType -import signal -import time import atexit -import serial -import serial.tools.list_ports -import os -import sys +from .connector import ( + Connector, + MockConnector, + SerialConnector, + CANConnector, + NoValidDeviceException, + NoWorkingDeviceException, +) +from .convert import string_to_viccan import threading -import glob -from std_msgs.msg import String, Header from astra_msgs.msg import VicCAN - -KNOWN_USBS = [ - (0x2E8A, 0x00C0), # Raspberry Pi Pico - (0x1A86, 0x55D4), # Adafruit Feather ESP32 V2 - (0x10C4, 0xEA60), # DOIT ESP32 Devkit V1 - (0x1A86, 0x55D3), # ESP32 S3 Development Board -] +from std_msgs.msg import String class Anchor(Node): @@ -36,308 +32,204 @@ class Anchor(Node): - VicCAN messages for Arm node * /anchor/from_vic/bio - VicCAN messages for Bio node + * /anchor/to_vic/debug + - A string copy of the messages published to ./relay are published here Subscribers: * /anchor/from_vic/mock_mcu - - For testing without an actual MCU, publish strings here as if they came from an MCU + - For testing without an actual MCU, publish ViCAN messages here as if they came from an MCU * /anchor/to_vic/relay - Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU * /anchor/to_vic/relay_string - - Publish raw strings to this topic to send directly to the MCU for debugging + - Send raw strings to connectors. Does not work for connectors that require conversion (like CANConnector) + * /anchor/relay + - Legacy method for talking to connectors. Takes String as input, but does not send the raw strings to connectors. + Instead, it converts them to VicCAN messages first. """ + connector: Connector + def __init__(self): - # Initalize node with name - super().__init__("anchor_node") # previously 'serial_publisher' + super().__init__("anchor_node") - self.serial_port: str | None = None # e.g., "/dev/ttyUSB0" + logger = self.get_logger() - # Serial port override - if port_override := os.getenv("PORT_OVERRIDE"): - self.serial_port = port_override + # ROS2 Parameter Setup - ################################################## - # Serial MCU Discovery + self.declare_parameter( + "connector", + "auto", + ParameterDescriptor( + name="connector", + description="Declares which MCU connector should be used. Defaults to 'auto'.", + type=ParameterType.PARAMETER_STRING, + additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.", + ), + ) - # If there was not a port override, look for a MCU over USB for Serial. - if self.serial_port is None: - comports = serial.tools.list_ports.comports() - real_ports = list( - filter( - lambda p: p.vid is not None - and p.pid is not None - and p.device is not None, - comports, + self.declare_parameter( + "can_override", + "", + ParameterDescriptor( + name="can_override", + description="Overrides which CAN channel will be used. Defaults to ''.", + type=ParameterType.PARAMETER_STRING, + additional_constraints="Must be a valid CAN network that shows up in `ip link show`.", + ), + ) + + self.declare_parameter( + "serial_override", + "", + ParameterDescriptor( + name="serial_override", + description="Overrides which serial port will be used. Defaults to ''.", + type=ParameterType.PARAMETER_STRING, + additional_constraints="Must be a valid path to a serial device file that shows up in `ls /dev/tty*`.", + ), + ) + + # Determine which connector to use. Options are Mock, Serial, and CAN + connector_select = ( + self.get_parameter("connector").get_parameter_value().string_value + ) + can_override = ( + self.get_parameter("can_override").get_parameter_value().string_value + ) + serial_override = ( + self.get_parameter("serial_override").get_parameter_value().string_value + ) + match connector_select: + case "serial": + logger.info("using serial connector") + self.connector = SerialConnector( + logger, self.get_clock(), serial_override ) - ) - recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports)) - - if len(recog_ports) == 1: # Found singular recognized MCU - found_port = recog_ports[0] - self.get_logger().info( - f"Selecting MCU '{found_port.description}' at {found_port.device}." + case "can": + logger.info("using CAN connector") + self.connector = CANConnector(logger, self.get_clock(), can_override) + case "mock": + logger.info("using mock connector") + self.connector = MockConnector(logger, self.get_clock()) + case "auto": + logger.info("automatically determining connector") + try: + logger.info("trying CAN connector") + self.connector = CANConnector( + logger, self.get_clock(), can_override + ) + except (NoValidDeviceException, NoWorkingDeviceException, TypeError): + logger.info("CAN connector failed, trying serial connector") + self.connector = SerialConnector( + logger, self.get_clock(), serial_override + ) + case _: + logger.fatal( + f"invalid value for connector parameter: {connector_select}" ) - self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0' - elif len(recog_ports) > 1: # Found multiple recognized MCUs - # Kinda jank log message - self.get_logger().error( - f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}" - ) - # Don't set self.serial_port; later if-statement will exit() - elif ( - len(recog_ports) == 0 and len(real_ports) > 0 - ): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU - self.get_logger().error( - f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}." - ) - # Don't set self.serial_port; later if-statement will exit() - else: # Found jack shit - self.get_logger().error("No valid Serial ports specified or found.") - # Don't set self.serial_port; later if-statement will exit() + exit(1) - # We still don't have a serial port; fall back to legacy discovery (Areeb's code) - # Loop through all serial devices on the computer to check for the MCU - if self.serial_port is None: - self.get_logger().warning("Falling back to legacy MCU discovery...") - ports = Anchor.list_serial_ports() - for _ in range(4): - if self.serial_port is not None: - break - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - # (f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until(bytes("\n", "utf8")) - - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.serial_port = port - self.get_logger().info(f"Found MCU at {self.serial_port}!") - break - except: - pass - - # If port is still None then we ain't finding no mcu - if self.serial_port is None: - self.get_logger().error("Unable to find MCU. Exiting...") - time.sleep(1) - sys.exit(1) - # Found a Serial port, try to open it; above code has not officially opened a Serial port - else: - self.get_logger().debug( - f"Attempting to open Serial port '{self.serial_port}'..." - ) - try: - self.serial_interface = serial.Serial( - self.serial_port, 115200, timeout=1 - ) - - # Attempt to get name of connected MCU - self.serial_interface.write( - b"can_relay_mode,on\n" - ) # can_relay_ready,[mcu] - mcu_name: str = "" - for _ in range(4): - response = self.serial_interface.read_until(bytes("\n", "utf8")) - try: - if b"can_relay_ready" in response: - args: list[str] = response.decode("utf8").strip().split(",") - if len(args) == 2: - mcu_name = args[1] - break - except UnicodeDecodeError: - pass # ignore malformed responses - self.get_logger().info( - f"MCU '{mcu_name}' is ready at '{self.serial_port}'." - ) - - except serial.SerialException as e: - self.get_logger().error( - f"Could not open Serial port '{self.serial_port}' for reason:" - ) - self.get_logger().error(e.strerror) - time.sleep(1) - sys.exit(1) - - # Close serial port on exit - atexit.register(self.cleanup) - - ################################################## # ROS2 Topic Setup - # New pub/sub with VicCAN - self.fromvic_debug_pub_ = self.create_publisher( - String, "/anchor/from_vic/debug", 20 + # Publishers + self.fromvic_debug_pub_ = self.create_publisher( # only used by serial + String, + "/anchor/from_vic/debug", + 20, ) self.fromvic_core_pub_ = self.create_publisher( - VicCAN, "/anchor/from_vic/core", 20 + VicCAN, + "/anchor/from_vic/core", + 20, ) self.fromvic_arm_pub_ = self.create_publisher( - VicCAN, "/anchor/from_vic/arm", 20 + VicCAN, + "/anchor/from_vic/arm", + 20, ) self.fromvic_bio_pub_ = self.create_publisher( - VicCAN, "/anchor/from_vic/bio", 20 + VicCAN, + "/anchor/from_vic/bio", + 20, + ) + # Debug publisher + self.tovic_debug_pub_ = self.create_publisher( + VicCAN, + "/anchor/to_vic/debug", + 20, ) - self.mock_mcu_sub_ = self.create_subscription( - String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20 - ) + # Subscribers self.tovic_sub_ = self.create_subscription( - VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20 + VicCAN, + "/anchor/to_vic/relay", + self.write_connector, + 20, ) - self.tovic_debug_sub_ = self.create_subscription( - String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20 + self.tovic_sub_legacy_ = self.create_subscription( + String, + "/anchor/relay", + self.write_connector_legacy, + 20, + ) + self.mock_mcu_sub_ = self.create_subscription( + String, + "/anchor/from_vic/mock_mcu", + self.relay_fromvic, + 20, + ) + self.tovic_string_sub_ = self.create_subscription( + String, + "/anchor/to_vic/relay_string", + self.connector.write_raw, + 20, ) - # Create publishers - self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10) - self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10) - self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10) - - self.debug_pub = self.create_publisher(String, "/anchor/debug", 10) - - # Create a subscriber - self.relay_sub = self.create_subscription( - String, "/anchor/relay", self.on_relay_tovic_string, 10 - ) - - def read_MCU(self): - """Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" - try: - output = str(self.serial_interface.readline(), "utf8") - - if output: - self.relay_fromvic(output) - # All output over debug temporarily - # self.get_logger().info(f"[MCU] {output}") - msg = String() - msg.data = output - self.debug_pub.publish(msg) - if output.startswith("can_relay_fromvic,core"): - self.core_pub.publish(msg) - elif output.startswith("can_relay_fromvic,arm") or output.startswith( - "can_relay_fromvic,digit" - ): # digit for voltage readings - self.arm_pub.publish(msg) - if output.startswith("can_relay_fromvic,citadel") or output.startswith( - "can_relay_fromvic,digit" - ): # digit for SHT sensor - self.bio_pub.publish(msg) - # msg = String() - # msg.data = output - # self.debug_pub.publish(msg) - return - except serial.SerialException as e: - print(f"SerialException: {e}") - print("Closing serial port.") - try: - if self.serial_interface.is_open: - self.serial_interface.close() - except: - pass - exit(1) - except TypeError as e: - print(f"TypeError: {e}") - print("Closing serial port.") - try: - if self.serial_interface.is_open: - self.serial_interface.close() - except: - pass - exit(1) - except Exception as e: - print(f"Exception: {e}") - # print("Closing serial port.") - # if self.ser.is_open: - # self.ser.close() - # exit(1) - - def on_mock_fromvic(self, msg: String): - """For testing without an actual MCU, publish strings here as if they came from an MCU""" - # self.get_logger().info(f"Got command from mock MCU: {msg}") - self.relay_fromvic(msg.data) - - def on_relay_tovic_viccan(self, msg: VicCAN): - """Relay a VicCAN message to the MCU""" - output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}" - for num in msg.data: - output += f",{round(num, 7)}" # limit to 7 decimal places - output += "\n" - # self.get_logger().info(f"VicCAN relay to MCU: {output}") - self.serial_interface.write(bytes(output, "utf8")) - - def relay_fromvic(self, msg: str): - """Relay a string message from the MCU to the appropriate VicCAN topic""" - self.fromvic_debug_pub_.publish(String(data=msg)) - parts = msg.strip().split(",") - if len(parts) > 0 and parts[0] != "can_relay_fromvic": - self.get_logger().debug(f"Ignoring non-VicCAN message: '{msg.strip()}'") - return - - # String validation - malformed: bool = False - malformed_reason: str = "" - if len(parts) < 3 or len(parts) > 7: - malformed = True - malformed_reason = ( - f"invalid argument count (expected [3,7], got {len(parts)})" - ) - elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]: - malformed = True - malformed_reason = f"invalid mcu_name '{parts[1]}'" - elif not (parts[2].isnumeric()) or int(parts[2]) < 0: - malformed = True - malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer" - else: - for x in parts[3:]: - try: - float(x) - except ValueError: - malformed = True - malformed_reason = f"data '{x}' is not a float" - break - - if malformed: - self.get_logger().warning( - f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}" - ) - return - - # Have valid VicCAN message - - output = VicCAN() - output.mcu_name = parts[1] - output.command_id = int(parts[2]) - if len(parts) > 3: - output.data = [float(x) for x in parts[3:]] - output.header = Header( - stamp=self.get_clock().now().to_msg(), frame_id="from_vic" - ) - - # self.get_logger().info(f"Relaying from MCU: {output}") - if output.mcu_name == "core": - self.fromvic_core_pub_.publish(output) - elif output.mcu_name == "arm" or output.mcu_name == "digit": - self.fromvic_arm_pub_.publish(output) - elif output.mcu_name == "citadel" or output.mcu_name == "digit": - self.fromvic_bio_pub_.publish(output) - - def on_relay_tovic_string(self, msg: String): - """Relay a raw string message to the MCU for debugging""" - message = msg.data - # self.get_logger().info(f"Sending command to MCU: {msg}") - self.serial_interface.write(bytes(message, "utf8")) - - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") + # Close devices on exit + atexit.register(self.cleanup) def cleanup(self): - print("Cleaning up before terminating...") - if self.serial_interface.is_open: - self.serial_interface.close() + self.connector.cleanup() + + def read_connector(self): + """Check the connector for new data from the MCU, and publish string to appropriate topics""" + viccan, raw = self.connector.read() + + if raw: + self.fromvic_debug_pub_.publish(String(data=raw)) + + if viccan: + self.relay_fromvic(viccan) + + def write_connector(self, msg: VicCAN): + """Write to the connector and send a copy to /anchor/to_vic/debug""" + self.connector.write(msg) + self.tovic_debug_pub_.publish(msg) + + @deprecated( + "Use /anchor/to_vic/relay or /anchor/to_vic/relay_string instead of /anchor/relay" + ) + def write_connector_legacy(self, msg: String): + """Write to the connector by first attempting to convert String to VicCAN""" + # please do not reference this code. ~riley + for cmd in msg.data.split("\n"): + viccan = string_to_viccan( + cmd, + "anchor", + self.get_logger(), + self.get_clock().now().to_msg(), + ) + if viccan: + self.write_connector(viccan) + + def relay_fromvic(self, msg: VicCAN): + """Relay a message from the MCU to the appropriate VicCAN topic""" + if msg.mcu_name == "core": + self.fromvic_core_pub_.publish(msg) + elif msg.mcu_name == "arm" or msg.mcu_name == "digit": + self.fromvic_arm_pub_.publish(msg) + elif msg.mcu_name == "citadel" or msg.mcu_name == "digit": + self.fromvic_bio_pub_.publish(msg) def main(args=None): @@ -350,16 +242,9 @@ def main(args=None): rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate while rclpy.ok(): - anchor_node.read_MCU() # Check the MCU for updates + anchor_node.read_connector() # Check the connector for updates rate.sleep() except (KeyboardInterrupt, ExternalShutdownException): print("Caught shutdown signal, shutting down...") finally: rclpy.try_shutdown() - - -if __name__ == "__main__": - signal.signal( - signal.SIGTERM, lambda signum, frame: sys.exit(0) - ) # Catch termination signals and exit cleanly - main() diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py new file mode 100644 index 0000000..0d4790b --- /dev/null +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -0,0 +1,438 @@ +from abc import ABC, abstractmethod +from astra_msgs.msg import VicCAN +from rclpy.clock import Clock +from rclpy.impl.rcutils_logger import RcutilsLogger +from .convert import string_to_viccan as _string_to_viccan, viccan_to_string + +# CAN +import can +import can.interfaces.socketcan +import struct + +# Serial +import serial +import serial.tools.list_ports + +KNOWN_USBS = [ + (0x2E8A, 0x00C0), # Raspberry Pi Pico + (0x1A86, 0x55D4), # Adafruit Feather ESP32 V2 + (0x10C4, 0xEA60), # DOIT ESP32 Devkit V1 + (0x1A86, 0x55D3), # ESP32 S3 Development Board +] +BAUD_RATE = 115200 + +MCU_IDS = [ + "broadcast", + "core", + "arm", + "digit", + "faerie", + "citadel", + "libs", +] + + +class NoValidDeviceException(Exception): + pass + + +class NoWorkingDeviceException(Exception): + pass + + +class MultipleValidDevicesException(Exception): + pass + + +class DeviceClosedException(Exception): + pass + + +class Connector(ABC): + logger: RcutilsLogger + clock: Clock + + def string_to_viccan(self, msg: str, mcu_name: str): + """function currying so that we do not need to pass logger and clock every time""" + return _string_to_viccan( + msg, + mcu_name, + self.logger, + self.clock.now().to_msg(), + ) + + def __init__(self, logger: RcutilsLogger, clock: Clock): + self.logger = logger + self.clock = clock + + @abstractmethod + def read(self) -> tuple[VicCAN | None, str | None]: + """ + Must return a tuple of (VicCAN, debug message or string repr of VicCAN) + """ + pass + + @abstractmethod + def write(self, msg: VicCAN): + pass + + @abstractmethod + def write_raw(self, msg: str): + pass + + def cleanup(self): + pass + + +class SerialConnector(Connector): + port: str + mcu_name: str + serial_interface: serial.Serial + + def __init__(self, logger: RcutilsLogger, clock: Clock, serial_override: str = ""): + super().__init__(logger, clock) + + ports = self._find_ports() + mcu_name: str | None = None + + if serial_override: + logger.warn( + f"using serial_override: `{serial_override}`! this will bypass several checks." + ) + ports = [serial_override] + mcu_name = "override" + + if len(ports) <= 0: + raise NoValidDeviceException("no valid serial device found") + if (l := len(ports)) > 1: + raise MultipleValidDevicesException( + f"too many ({l}) valid serial devices found" + ) + + # check each of our ports to make sure one of them is responding + port = ports[0] + # we might already have a name by now if we overrode earlier + mcu_name = mcu_name or self._get_name(port) + if not mcu_name: + raise NoWorkingDeviceException( + f"found {port}, but it did not respond with its name" + ) + + self.port = port + self.mcu_name = mcu_name + + # if we fail at this point, it should crash because we've already tested the port + self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1) + + def _find_ports(self) -> list[str]: + """ + Finds all valid ports but does not test them + + returns: all valid ports + """ + comports = serial.tools.list_ports.comports() + valid_ports = list( + map( # get just device strings + lambda p: p.device, + filter( # make sure we have a known device + lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None, + comports, + ), + ) + ) + self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]") + return valid_ports + + def _get_name(self, port: str) -> str | None: + """ + Get the name of the MCU (if it works) + + returns: str name of the MCU, None if it doesn't work + """ + # attempt to open the serial port + serial_interface: serial.Serial + try: + self.logger.info(f"asking {port} for its name") + serial_interface = serial.Serial(port, BAUD_RATE, timeout=1) + + serial_interface.write(b"can_relay_mode,on\n") + + for i in range(4): + self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name") + response = serial_interface.read_until(bytes("\n", "utf8")) + try: + if b"can_relay_ready" in response: + args: list[str] = response.decode("utf8").strip().split(",") + if len(args) == 2: + self.logger.info(f"we are talking to {args[1]}") + return args[1] + break + except UnicodeDecodeError as e: + self.logger.info( + f"ignoring UnicodeDecodeError when asking for MCU name: {e}" + ) + + if serial_interface.is_open: + # turn relay mode off if it failed to respond with its name + serial_interface.write(b"can_relay_mode,off\n") + serial_interface.close() + except serial.SerialException as e: + self.logger.error(f"SerialException when asking for MCU name: {e}") + + return None + + def read(self) -> tuple[VicCAN | None, str | None]: + try: + raw = str(self.serial_interface.readline(), "utf8") + + if not raw: + return (None, None) + + return ( + self.string_to_viccan(raw, self.mcu_name), + raw, + ) + except serial.SerialException as e: + self.logger.error(f"SerialException: {e}") + raise DeviceClosedException(f"serial port {self.port} closed unexpectedly") + except Exception: + return (None, None) # pretty much no other error matters + + def write(self, msg: VicCAN): + self.write_raw(viccan_to_string(msg)) + + def write_raw(self, msg: str): + self.serial_interface.write(bytes(msg, "utf8")) + + def cleanup(self): + self.logger.info(f"closing serial port if open {self.port}") + try: + if self.serial_interface.is_open: + self.serial_interface.close() + except Exception as e: + self.logger.error(e) + + +class CANConnector(Connector): + def __init__(self, logger: RcutilsLogger, clock: Clock, can_override: str): + super().__init__(logger, clock) + + self.can_channel: str | None = None + self.can_bus: can.BusABC | None = None + + avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs() + + if len(avail) == 0: + raise NoValidDeviceException("no CAN interfaces found") + + # filter to busses whose channel matches the can_override + if can_override: + self.logger.info(f"overrode can interface with {can_override}") + avail = list( + filter( + lambda b: b.get("channel") == can_override, + avail, + ) + ) + + if (l := len(avail)) > 1: + channels = ", ".join(str(b.get("channel")) for b in avail) + raise MultipleValidDevicesException( + f"too many ({l}) CAN interfaces found: [{channels}]" + ) + + bus = avail[0] + self.can_channel = str(bus.get("channel")) + self.logger.info(f"found CAN interface '{self.can_channel}'") + + try: + self.can_bus = can.Bus( + interface="socketcan", + channel=self.can_channel, + bitrate=1_000_000, + ) + except can.CanError as e: + raise NoWorkingDeviceException( + f"could not open CAN channel '{self.can_channel}': {e}" + ) + + if self.can_channel and self.can_channel.startswith("v"): + self.logger.warn("CAN interface is likely virtual") + + def read(self) -> tuple[VicCAN | None, str | None]: + if not self.can_bus: + raise DeviceClosedException("CAN bus not initialized") + + try: + message = self.can_bus.recv(timeout=0.0) + except can.CanError as e: + self.logger.error(f"CAN error while receiving: {e}") + raise DeviceClosedException("CAN bus closed unexpectedly") + + if message is None: + return (None, None) + + arbitration_id = message.arbitration_id & 0x7FF + data_bytes = bytes(message.data) + + mcu_key = (arbitration_id >> 8) & 0b111 + data_type_key = (arbitration_id >> 6) & 0b11 + command = arbitration_id & 0x3F + + try: + mcu_name = MCU_IDS[mcu_key] + except IndexError: + self.logger.warn( + f"received CAN frame with unknown MCU key {mcu_key}; id=0x{arbitration_id:X}" + ) + return (None, None) + + data: list[float] = [] + + try: + if data_type_key == 3: + data = [] + elif data_type_key == 0: + if len(data_bytes) < 8: + self.logger.warn( + f"received double payload with insufficient length {len(data_bytes)}; dropping frame" + ) + return (None, None) + (value,) = struct.unpack(">d", data_bytes[:8]) + data = [float(value)] + elif data_type_key == 1: + if len(data_bytes) < 8: + self.logger.warn( + f"received float32x2 payload with insufficient length {len(data_bytes)}; dropping frame" + ) + return (None, None) + v1, v2 = struct.unpack(">ff", data_bytes[:8]) + data = [float(v1), float(v2)] + elif data_type_key == 2: + if len(data_bytes) < 8: + self.logger.warn( + f"received int16x4 payload with insufficient length {len(data_bytes)}; dropping frame" + ) + return (None, None) + i1, i2, i3, i4 = struct.unpack(">hhhh", data_bytes[:8]) + data = [float(i1), float(i2), float(i3), float(i4)] + else: + self.logger.warn( + f"received CAN frame with unknown data_type_key {data_type_key}; id=0x{arbitration_id:X}" + ) + return (None, None) + except struct.error as e: + self.logger.error(f"error unpacking CAN payload: {e}") + return (None, None) + + viccan = VicCAN( + mcu_name=mcu_name, + command_id=int(command), + data=data, + ) + + self.logger.debug( + f"received CAN frame id=0x{message.arbitration_id:X}, " + f"decoded as VicCAN(mcu_name={viccan.mcu_name}, command_id={viccan.command_id}, data={viccan.data})" + ) + + return ( + viccan, + f"{viccan.mcu_name},{viccan.command_id}," + + ",".join(map(str, list(viccan.data))), + ) + + def write(self, msg: VicCAN): + if not self.can_bus: + raise DeviceClosedException("CAN bus not initialized") + + # build 11-bit arbitration ID according to VicCAN spec: + # bits 10..8: targeted MCU key + # bits 7..6: data type key + # bits 5..0: command + + # map MCU name to 3-bit key. + try: + mcu_id = MCU_IDS.index((msg.mcu_name or "").lower()) + except ValueError: + self.logger.error( + f"unknown VicCAN mcu_name '{msg.mcu_name}' for CAN frame; dropping message" + ) + return + + # determine data type from length: + # 0: double x1, 1: float32 x2, 2: int16 x4, 3: empty + match data_len := len(msg.data): + case 0: + data_type = 3 + data = bytes() + case 1: + data_type = 0 + data = struct.pack(">d", *msg.data) + case 2: + data_type = 1 + data = struct.pack(">ff", *msg.data) + case 3 | 4: # 3 gets treated as 4 + data_type = 2 + if data_len == 3: + msg.data.append(0) + data = struct.pack(">hhhh", *[int(x) for x in msg.data]) + case _: + self.logger.error( + f"unexpected VicCAN data length: {data_len}; dropping message" + ) + return + + # command is limited to 6 bits. + command = int(msg.command_id) + if command < 0 or command > 0x3F: + self.logger.error( + f"invalid command_id for CAN frame: {command}; dropping message" + ) + return + + try: + can_message = can.Message( + arbitration_id=(mcu_id << 8) | (data_type << 6) | command, + data=data, + is_extended_id=False, + ) + except Exception as e: + self.logger.error(f"failed to construct CAN message: {e}") + return + + try: + self.can_bus.send(can_message) + self.logger.debug( + f"sent CAN frame id=0x{can_message.arbitration_id:X}, " + f"data={list(can_message.data)}" + ) + except can.CanError as e: + self.logger.error(f"CAN error while sending: {e}") + raise DeviceClosedException("CAN bus closed unexpectedly") + + def write_raw(self, msg: str): + self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}") + + def cleanup(self): + try: + if self.can_bus is not None: + self.logger.info("shutting down CAN bus") + self.can_bus.shutdown() + except Exception as e: + self.logger.error(e) + + +class MockConnector(Connector): + def __init__(self, logger: RcutilsLogger, clock: Clock): + super().__init__(logger, clock) + # No hardware interface for MockConnector. Publish to `/anchor/from_vic/mock_mcu` instead. + + def read(self) -> tuple[VicCAN | None, str | None]: + return (None, None) + + def write(self, msg: VicCAN): + pass + + def write_raw(self, msg: str): + pass diff --git a/src/anchor_pkg/anchor_pkg/convert.py b/src/anchor_pkg/anchor_pkg/convert.py new file mode 100644 index 0000000..bef3301 --- /dev/null +++ b/src/anchor_pkg/anchor_pkg/convert.py @@ -0,0 +1,65 @@ +from astra_msgs.msg import VicCAN +from std_msgs.msg import Header +from builtin_interfaces.msg import Time +from rclpy.impl.rcutils_logger import RcutilsLogger + + +def string_to_viccan( + msg: str, mcu_name: str, logger: RcutilsLogger, time: Time +) -> VicCAN | None: + """ + Converts the serial string VicCAN format to a ROS2 VicCAN message. + Does not fill out the Header of the message. + On a failure, it will log at a debug level why it failed and return None. + """ + + parts: list[str] = msg.strip().split(",") + + # don't need an extra check because len of .split output is always >= 1 + if not parts[0].startswith("can_relay_"): + logger.debug(f"got non-CAN data from {mcu_name}: {msg}") + return None + elif len(parts) < 3: + logger.debug(f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}") + return None + elif len(parts) > 7: + logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}") + return None + + try: + command_id = int(parts[2]) + except ValueError: + logger.debug( + f"got garbage (non-integer command id) CAN data from {mcu_name}: {msg}" + ) + return None + + if command_id not in range(64): + logger.debug( + f"got garbage (wrong command id {command_id}) CAN data from {mcu_name}: {msg}" + ) + return None + + try: + return VicCAN( + header=Header( + stamp=time, + frame_id="from_vic", + ), + mcu_name=parts[1], + command_id=command_id, + data=[float(x) for x in parts[3:]], + ) + except ValueError: + logger.debug(f"got garbage (non-numerical) CAN data from {mcu_name}: {msg}") + return None + + +def viccan_to_string(viccan: VicCAN) -> str: + """Converts a ROS2 VicCAN message to the serial string VicCAN format.""" + # make sure we accept 3 digits and treat it as 4 + if len(viccan.data) == 3: + viccan.data.append(0) + # go from [ w, x, y, z ] -> ",w,x,y,z" & round to 7 digits max + data = "".join([f",{round(val,7)}" for val in viccan.data]) + return f"can_relay_tovic,{viccan.mcu_name},{viccan.command_id}{data}\n" diff --git a/src/anchor_pkg/launch/rover.launch.py b/src/anchor_pkg/launch/rover.launch.py index a4df87a..cfcb082 100644 --- a/src/anchor_pkg/launch/rover.launch.py +++ b/src/anchor_pkg/launch/rover.launch.py @@ -1,206 +1,174 @@ -#!/usr/bin/env python3 - from launch import LaunchDescription -from launch.actions import ( - DeclareLaunchArgument, - OpaqueFunction, - Shutdown, - IncludeLaunchDescription, -) -from launch.substitutions import ( - LaunchConfiguration, - ThisLaunchFileDir, - PathJoinSubstitution, -) -from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch.actions import DeclareLaunchArgument, Shutdown, IncludeLaunchDescription from launch.conditions import IfCondition +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution from launch_ros.actions import Node from launch_ros.substitutions import FindPackageShare - - -# Prevent making __pycache__ directories -from sys import dont_write_bytecode - -dont_write_bytecode = True - - -def launch_setup(context, *args, **kwargs): - # Retrieve the resolved value of the launch argument 'mode' - mode = LaunchConfiguration("mode").perform(context) - nodes = [] - - if mode == "anchor": - # Launch every node and pass "anchor" as the parameter - - nodes.append( - Node( - package="arm_pkg", - executable="arm", # change as needed - name="arm", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - nodes.append( - Node( - package="core_pkg", - executable="core", # change as needed - name="core", - output="both", - parameters=[ - {"launch_mode": mode}, - { - "use_ros2_control": LaunchConfiguration( - "use_ros2_control", default=False - ) - }, - { - "rover_platform", - LaunchConfiguration("rover_platform", default="auto"), - }, - ], - on_exit=Shutdown(), - ) - ) - nodes.append( - Node( - package="core_pkg", - executable="ptz", # change as needed - name="ptz", - output="both", - condition=IfCondition(LaunchConfiguration("use_ptz", default="true")), - # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical - # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure - ) - ) - nodes.append( - Node( - package="bio_pkg", - executable="bio", # change as needed - name="bio", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - nodes.append( - Node( - package="anchor_pkg", - executable="anchor", # change as needed - name="anchor", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - elif mode in ["arm", "core", "bio", "ptz"]: - # Only launch the node corresponding to the provided mode. - if mode == "arm": - nodes.append( - Node( - package="arm_pkg", - executable="arm", - name="arm", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - elif mode == "core": - nodes.append( - Node( - package="core_pkg", - executable="core", - name="core", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - elif mode == "bio": - nodes.append( - Node( - package="bio_pkg", - executable="bio", - name="bio", - output="both", - parameters=[{"launch_mode": mode}], - on_exit=Shutdown(), - ) - ) - elif mode == "ptz": - nodes.append( - Node( - package="core_pkg", - executable="ptz", - name="ptz", - output="both", - on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched - ) - ) - else: - # If an invalid mode is provided, print an error. - print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.") - - return nodes +from launch.launch_description_sources import PythonLaunchDescriptionSource def generate_launch_description(): - declare_arg = DeclareLaunchArgument( - "mode", - default_value="anchor", - description="Launch mode: arm, core, bio, anchor, or ptz", + connector = LaunchConfiguration("connector") + serial_override = LaunchConfiguration("serial_override") + can_override = LaunchConfiguration("can_override") + use_ptz = LaunchConfiguration("use_ptz") + + ld = LaunchDescription() + + # arguments + ld.add_action( + DeclareLaunchArgument( + "connector", + default_value="auto", + description="Connector parameter for anchor node (default: 'auto')", + ) ) - ros2_control_arg = DeclareLaunchArgument( - "use_ros2_control", - default_value="false", - description="Whether to use DiffDriveController for driving instead of direct Twist", + ld.add_action( + DeclareLaunchArgument( + "serial_override", + default_value="", + description="Serial port override parameter for anchor node (default: '')", + ) ) - testbed_arg = DeclareLaunchArgument( - "rover_platform", - default_value="auto", - description="Choose the rover platform (either clucky or testbed). If left on auto, will defer to ROVER_PLATFORM environment variable.", - choices=["clucky", "testbed", "auto"], + ld.add_action( + DeclareLaunchArgument( + "can_override", + default_value="", + description="CAN network override parameter for anchor node (default: '')", + ) ) - rsp = IncludeLaunchDescription( - PythonLaunchDescriptionSource( - PathJoinSubstitution( - [ - FindPackageShare("core_description"), - "launch", - "robot_state_publisher.launch.py", - ] - ) - ), - condition=IfCondition(LaunchConfiguration("use_ros2_control")), - launch_arguments={("hardware_mode", "physical")}, + ld.add_action( + DeclareLaunchArgument( + "use_ptz", + default_value="true", # must be string for launch system + description="Whether to launch PTZ node (default: true)", + ) ) - controllers = IncludeLaunchDescription( - PythonLaunchDescriptionSource( - PathJoinSubstitution( - [ - FindPackageShare("core_description"), - "launch", - "spawn_controllers.launch.py", - ] - ) - ), - condition=IfCondition(LaunchConfiguration("use_ros2_control")), - launch_arguments={("hardware_mode", "physical")}, + ld.add_action( + DeclareLaunchArgument( + "use_ros2_control", + default_value="false", + description="Whether to use DiffDriveController for driving instead of direct Twist", + ) ) - return LaunchDescription( - [ - declare_arg, - ros2_control_arg, - testbed_arg, - rsp, - controllers, - OpaqueFunction(function=launch_setup), - ] + ld.add_action( + DeclareLaunchArgument( + "rover_platform", + default_value="auto", + description="Choose the rover platform (either clucky or testbed). If left on auto, will defer to ROVER_PLATFORM environment variable.", + choices=["clucky", "testbed", "auto"], + ) ) + + # nodes + ld.add_action( + Node( + package="anchor_pkg", + executable="anchor", + name="anchor", + output="both", + parameters=[ + { + "launch_mode": "anchor", + "connector": connector, + "serial_override": serial_override, + "can_override": can_override, + } + ], + on_exit=Shutdown(), + ) + ) + + ld.add_action( + Node( + package="arm_pkg", + executable="arm", + name="arm", + output="both", + parameters=[{"launch_mode": "anchor"}], + on_exit=Shutdown(), + ) + ) + + ld.add_action( + Node( + package="bio_pkg", + executable="bio", + name="bio", + output="both", + parameters=[{"launch_mode": "anchor"}], + on_exit=Shutdown(), + ) + ) + + ld.add_action( + Node( + package="core_pkg", + executable="core", + name="core", + output="both", + parameters=[ + {"launch_mode": "anchor"}, + { + "use_ros2_control": LaunchConfiguration( + "use_ros2_control", default=False + ) + }, + { + "rover_platform": LaunchConfiguration( + "rover_platform", default="auto" + ) + }, + ], + on_exit=Shutdown(), + ) + ) + + ld.add_action( + Node( + package="core_pkg", + executable="ptz", + name="ptz", + output="both", + condition=IfCondition(use_ptz), + ) + ) + + ld.add_action( + IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution( + [ + FindPackageShare("core_description"), + "launch", + "robot_state_publisher.launch.py", + ] + ) + ), + condition=IfCondition(LaunchConfiguration("use_ros2_control")), + launch_arguments={("hardware_mode", "physical")}, + ) + ) + + ld.add_action( + IncludeLaunchDescription( + PythonLaunchDescriptionSource( + PathJoinSubstitution( + [ + FindPackageShare("core_description"), + "launch", + "spawn_controllers.launch.py", + ] + ) + ), + condition=IfCondition(LaunchConfiguration("use_ros2_control")), + launch_arguments={("hardware_mode", "physical")}, + ) + ) + + return ld diff --git a/src/anchor_pkg/package.xml b/src/anchor_pkg/package.xml index 9addb40..7e11a0c 100644 --- a/src/anchor_pkg/package.xml +++ b/src/anchor_pkg/package.xml @@ -2,15 +2,16 @@ anchor_pkg - 1.0.0 + 0.0.0 ASTRA VicCAN driver package, using python-can and pyserial. - tristan + Riley AGPL-3.0-only rclpy common_interfaces python3-serial + python3-can core_pkg arm_pkg diff --git a/src/core_pkg/core_pkg/core_ptz.py b/src/core_pkg/core_pkg/core_ptz.py index eafcc74..3e853a4 100644 --- a/src/core_pkg/core_pkg/core_ptz.py +++ b/src/core_pkg/core_pkg/core_ptz.py @@ -262,7 +262,7 @@ class PtzNode(Node): f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" ) self.debug_pub.publish(msg) - self.get_logger().info(message_text) + self.get_logger().debug(message_text) def run_async_func(self, coro): """Run an async function in the event loop.""" diff --git a/treefmt.nix b/treefmt.nix new file mode 100644 index 0000000..6bd693c --- /dev/null +++ b/treefmt.nix @@ -0,0 +1,8 @@ +{ pkgs, ... }: +{ + projectRootFile = "flake.nix"; + programs = { + nixfmt.enable = true; + black.enable = true; + }; +}