general cleanup before i start work on CANceiver

This commit is contained in:
ryleu
2026-01-22 00:31:46 -05:00
parent 4a98c3d435
commit 5e2bbe23a3
2 changed files with 88 additions and 145 deletions

View File

@@ -1,20 +1,16 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty
import signal
import time
import atexit
import glob
import signal
import threading
from typing import cast
import rclpy
import serial
import serial.tools.list_ports
import os
import sys
import threading
import glob
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import Header, String
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
KNOWN_USBS = [
@@ -46,15 +42,17 @@ class Anchor(Node):
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
serial_port: str | None = None # e.g., "/dev/ttyUSB0"
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
super().__init__("anchor_node")
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
self.declare_parameter("serial_port", None)
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
if port_override := self.get_parameter("serial_port").value:
self.serial_port = cast(str, port_override) # Cast to make the linter happy
##################################################
# Serial MCU Discovery
@@ -62,104 +60,64 @@ class Anchor(Node):
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
recog_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
# Filter for ports we know that aren't invalid
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
# Guards
if len(recog_ports) > 1: # If we found too many
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
exit(1)
if len(recog_ports) == 0:
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
f"Found no recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
exit(1)
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
# Everything is ok
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
# String, location of device file; e.g., '/dev/ttyACM0'
self.serial_port = found_port.device
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
# Found a Serial port, try to open it; above code has not officially opened a Serial port
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(self.serial_port, 115200, timeout=1)
# Attempt to get name of connected MCU
self.serial_interface.write(b"can_relay_mode,on\n") # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4): # Sometimes it takes a sec
response = self.serial_interface.read_until(bytes("\n", "utf8"))
if b"can_relay_ready" in response:
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
args: list[str] = response.decode("utf8").strip().split(",")
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
continue # ignore malformed responses
if len(args) == 2:
mcu_name = args[1]
break
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
except serial.SerialException as e:
self.get_logger().fatal(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().fatal(e.strerror)
exit(1)
# Close serial port on exit
atexit.register(self.cleanup)
@@ -205,54 +163,38 @@ class Anchor(Node):
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output: str | None = None
try:
output = str(self.serial_interface.readline(), "utf8")
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
self.get_logger().fatal(f"SerialException: {e}")
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
self.get_logger().fatal(f"TypeError: {e}")
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
self.get_logger().error(f"Exception: {e}")
if output:
self.relay_fromvic(output)
msg = String()
msg.data = output
# All output over debug
self.debug_pub.publish(msg)
# Send the message to the right place
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
return
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
@@ -360,6 +302,6 @@ def main(args=None):
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
signal.SIGTERM, lambda signum, frame: exit(0)
) # Catch termination signals and exit cleanly
main()