feat: (anchor) add new Serial finder code

Uses vendor and product ids to find a microcontroller, and detects its name after connecting. Upon failure, falls back to Areeb's code--just in case.
Also renamed `self.ser` to `self.serial_interface` and `self.port` to `self.serial_port` for clarity.
This commit is contained in:
David
2025-11-10 23:24:14 -06:00
parent b84ca6757d
commit 5e7776631d

View File

@@ -8,6 +8,7 @@ import time
import atexit import atexit
import serial import serial
import serial.tools.list_ports
import os import os
import sys import sys
import threading import threading
@@ -16,6 +17,13 @@ import glob
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
class Anchor(Node): class Anchor(Node):
""" """
@@ -42,38 +50,116 @@ class Anchor(Node):
# Initalize node with name # Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher' super().__init__("anchor_node") # previously 'serial_publisher'
# Loop through all serial devices on the computer to check for the MCU self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
self.port = None
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"): if port_override := os.getenv("PORT_OVERRIDE"):
self.port = port_override self.serial_port = port_override
ports = Anchor.list_serial_ports()
for i in range(4):
if self.port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU ##################################################
if b"pong" in response: # Serial MCU Discovery
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is None: # If there was not a port override, look for a MCU over USB for Serial.
self.get_logger().info("Unable to find MCU...") if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# time.sleep(1)
# sys.exit(1)
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# time.sleep(1)
# sys.exit(1)
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# time.sleep(1)
# sys.exit(1)
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1) time.sleep(1)
sys.exit(1) sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
self.ser = serial.Serial(self.port, 115200) # Attempt to get name of connected MCU
self.get_logger().info(f"Enabling Relay Mode") self.serial_interface.write(
self.ser.write(b"can_relay_mode,on\n") b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit # Close serial port on exit
atexit.register(self.cleanup) atexit.register(self.cleanup)
@@ -120,7 +206,7 @@ class Anchor(Node):
def read_MCU(self): def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" """Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try: try:
output = str(self.ser.readline(), "utf8") output = str(self.serial_interface.readline(), "utf8")
if output: if output:
self.relay_fromvic(output) self.relay_fromvic(output)
@@ -146,14 +232,20 @@ class Anchor(Node):
except serial.SerialException as e: except serial.SerialException as e:
print(f"SerialException: {e}") print(f"SerialException: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: try:
self.ser.close() if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: try:
self.ser.close() if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
@@ -174,7 +266,7 @@ class Anchor(Node):
output += f",{round(num, 7)}" # limit to 7 decimal places output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n" output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}") # self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.ser.write(bytes(output, "utf8")) self.serial_interface.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str): def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic""" """Relay a string message from the MCU to the appropriate VicCAN topic"""
@@ -236,7 +328,7 @@ class Anchor(Node):
"""Relay a raw string message to the MCU for debugging""" """Relay a raw string message to the MCU for debugging"""
message = msg.data message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}") # self.get_logger().info(f"Sending command to MCU: {msg}")
self.ser.write(bytes(message, "utf8")) self.serial_interface.write(bytes(message, "utf8"))
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():
@@ -244,8 +336,8 @@ class Anchor(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") print("Cleaning up before terminating...")
if self.ser.is_open: if self.serial_interface.is_open:
self.ser.close() self.serial_interface.close()
def main(args=None): def main(args=None):