style: clean up core and headless

This commit is contained in:
David
2025-09-09 20:12:46 -05:00
parent 4c972e6264
commit 93226203f1
2 changed files with 54 additions and 78 deletions

View File

@@ -16,12 +16,11 @@ import glob
from math import sqrt from math import sqrt
from std_msgs.msg import String from std_msgs.msg import String
from sensor_msgs.msg import Imu, NavSatFix
from geometry_msgs.msg import TwistStamped, Twist
from ros2_interfaces_pkg.msg import CoreFeedback from ros2_interfaces_pkg.msg import CoreFeedback
from ros2_interfaces_pkg.msg import CoreControl from ros2_interfaces_pkg.msg import CoreControl
# NEW
from sensor_msgs.msg import Imu, NavSatFix
from geometry_msgs.msg import TwistStamped, Twist
serial_pub = None serial_pub = None
thread = None thread = None
@@ -41,44 +40,42 @@ control_qos = qos.QoSProfile(
liveliness_lease_duration=Duration(seconds=5) liveliness_lease_duration=Duration(seconds=5)
) )
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
super().__init__("core_node")#previously 'serial_publisher' super().__init__("core_node")
# Get launch mode parameter # Launch mode -- anchor vs core
self.declare_parameter('launch_mode', 'core') self.declare_parameter('launch_mode', 'core')
self.launch_mode = self.get_parameter('launch_mode').value self.launch_mode = self.get_parameter('launch_mode').value
self.get_logger().info(f"core launch_mode is: {self.launch_mode}") self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
# Create publishers
self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
# Create a subscriber
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
# Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
# Create a service server for pinging the rover
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
# Anchor
if self.launch_mode == 'anchor': if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
# Controls
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos)
self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos)
# Feedback
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
self.core_feedback = CoreFeedback() self.core_feedback = CoreFeedback()
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
## NEW TOPICS
self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10) self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10)
self.imu_state = Imu() self.imu_state = Imu()
self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10) self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10)
self.gps_state = NavSatFix() self.gps_state = NavSatFix()
self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos) # Debug
self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
# Core (non-anchor) specific
if self.launch_mode == 'core': if self.launch_mode == 'core':
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
self.port = None self.port = None
@@ -90,7 +87,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1) ser = serial.Serial(port, 115200, timeout=1)
#(f"Checking port {port}...") #(f"Checking port {port}...")
ser.write(b"ping\n") ser.write(b"ping\n")
response = ser.read_until("\n") response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU # if pong is in response, then we are talking with the MCU
if b"pong" in response: if b"pong" in response:
@@ -111,6 +108,7 @@ class SerialRelay(Node):
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
# end __init__()
def run(self): def run(self):
@@ -118,8 +116,7 @@ class SerialRelay(Node):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start() thread.start()
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == 'core': if self.launch_mode == 'core':
@@ -127,7 +124,7 @@ class SerialRelay(Node):
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(0) sys.exit(0)
def read_MCU(self): def read_MCU(self): # NON-ANCHOR SPECIFIC
try: try:
output = str(self.ser.readline(), "utf8") output = str(self.ser.readline(), "utf8")
@@ -138,59 +135,31 @@ class SerialRelay(Node):
msg.data = output msg.data = output
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
return return
# Temporary
# packet = output.strip().split(',')
# if len(packet) >= 2 and packet[0] == "core" and packet[1] == "telemetry":
# feedback = CoreFeedback()
# feedback.gpslat = float(packet[2])
# feedback.gpslon = float(packet[3])
# feedback.gpssat = float(packet[4])
# feedback.bnogyr.x = float(packet[5])
# feedback.bnogyr.y = float(packet[6])
# feedback.bnogyr.z = float(packet[7])
# feedback.bnoacc.x = float(packet[8])
# feedback.bnoacc.y = float(packet[9])
# feedback.bnoacc.z = float(packet[10])
# feedback.orient = float(packet[11])
# feedback.bmptemp = float(packet[12])
# feedback.bmppres = float(packet[13])
# feedback.bmpalt = float(packet[14])
# self.telemetry_publisher.publish(feedback)
# else:
# # print(f"[MCU] {output}", end="")
# # msg = String()
# # msg.data = output
# # self.debug_pub.publish(msg)
# return
except serial.SerialException as e: except serial.SerialException as e:
print(f"SerialException: {e}") print(f"SerialException: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) sys.exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) sys.exit(1)
def scale_duty(self, value: float, max_speed: float): def scale_duty(self, value: float, max_speed: float):
leftMin = -1 leftMin = -1
leftMax = 1 leftMax = 1
rightMin = -max_speed/100.0 rightMin = -max_speed/100.0
rightMax = max_speed/100.0 rightMax = max_speed/100.0
# Figure out how 'wide' each range is # Figure out how 'wide' each range is
leftSpan = leftMax - leftMin leftSpan = leftMax - leftMin
rightSpan = rightMax - rightMin rightSpan = rightMax - rightMin
@@ -202,7 +171,6 @@ class SerialRelay(Node):
return str(rightMin + (valueScaled * rightSpan)) return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg: CoreControl): def send_controls(self, msg: CoreControl):
#can_relay_tovic,core,19, left_stick, right_stick
if(msg.turn_to_enable): if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
else: else:
@@ -260,32 +228,38 @@ class SerialRelay(Node):
self.get_logger().info(f"[Core to MCU] {msg}") self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
#self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}") # GNSS Lattitude
if output.startswith("can_relay_fromvic,core,48"): #GNSS Lattitude if output.startswith("can_relay_fromvic,core,48"):
self.core_feedback.gps_lat = float(parts[3]) self.core_feedback.gps_lat = float(parts[3])
self.gps_state.latitude = float(parts[3]) self.gps_state.latitude = float(parts[3])
elif output.startswith("can_relay_fromvic,core,49"):#GNSS Longitude # GNSS Longitude
elif output.startswith("can_relay_fromvic,core,49"):
self.core_feedback.gps_long = float(parts[3]) self.core_feedback.gps_long = float(parts[3])
self.gps_state.longitude = float(parts[3]) self.gps_state.longitude = float(parts[3])
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count # GNSS Satellite count
elif output.startswith("can_relay_fromvic,core,50"):
self.core_feedback.gps_sats = round(float(parts[3])) self.core_feedback.gps_sats = round(float(parts[3]))
self.core_feedback.gps_alt = round(float(parts[4]), 2) self.core_feedback.gps_alt = round(float(parts[4]), 2)
self.gps_state.altitude = float(parts[3]) self.gps_state.altitude = float(parts[3])
self.gps_state.altitude = round(float(parts[4]), 2) self.gps_state.altitude = round(float(parts[4]), 2)
elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z # Gyro x, y, z
elif output.startswith("can_relay_fromvic,core,51"):
self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6])) self.core_feedback.imu_calib = round(float(parts[6]))
elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading # Accel x, y, z, heading
elif output.startswith("can_relay_fromvic,core,52"):
self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bno_accel.z = float(parts[5]) self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.orientation = float(parts[6]) self.core_feedback.orientation = float(parts[6])
elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback # REV Sparkmax feedback
elif output.startswith("can_relay_fromvic,core,53"):
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0 temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0 voltage = float(parts[5]) / 10.0
@@ -306,12 +280,14 @@ class SerialRelay(Node):
self.core_feedback.br_temp = temp self.core_feedback.br_temp = temp
self.core_feedback.br_voltage = voltage self.core_feedback.br_voltage = voltage
self.core_feedback.br_current = current self.core_feedback.br_current = current
elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100 # Voltages batt, 12, 5, 3, all * 100
elif output.startswith("can_relay_fromvic,core,54"):
self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.bat_voltage = float(parts[3]) / 100.0
self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0 self.core_feedback.voltage_5 = float(parts[5]) / 100.0
self.core_feedback.voltage_3 = float(parts[6]) / 100.0 self.core_feedback.voltage_3 = float(parts[6]) / 100.0
elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure # BMP Temp, Altitude, Pressure
elif output.startswith("can_relay_fromvic,core,56"):
self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bmp_temp = float(parts[3])
self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5]) self.core_feedback.bmp_pres = float(parts[5])
@@ -339,11 +315,11 @@ class SerialRelay(Node):
except Exception as e: except Exception as e:
exit(0) exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub: if serial_pub:
serial_pub.cleanup() serial_pub.cleanup()
def main(args=None): def main(args=None):
@@ -359,4 +335,3 @@ if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()

View File

@@ -1,23 +1,23 @@
import rclpy import rclpy
from rclpy.node import Node
from rclpy import qos from rclpy import qos
from rclpy.duration import Duration from rclpy.duration import Duration
from rclpy.node import Node
import pygame
import signal
import time import time
import atexit
import serial import serial
import os
import sys import sys
import threading import threading
import glob import glob
import os
import importlib
from std_msgs.msg import String from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreControl, ArmManual
from geometry_msgs.msg import Twist from geometry_msgs.msg import Twist
from ros2_interfaces_pkg.msg import CoreControl, ArmManual
import pygame
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
@@ -50,7 +50,7 @@ arm_stop_msg.linear_actuator = 0
arm_stop_msg.laser = 0 arm_stop_msg.laser = 0
ctrl_mode = "core" ctrl_mode = "core"
CORE_MODE = "duty" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -260,4 +260,5 @@ def main(args=None):
rclpy.shutdown() rclpy.shutdown()
if __name__ == '__main__': if __name__ == '__main__':
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()