diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 033639f..862243a 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -16,12 +16,11 @@ import glob from math import sqrt from std_msgs.msg import String +from sensor_msgs.msg import Imu, NavSatFix +from geometry_msgs.msg import TwistStamped, Twist from ros2_interfaces_pkg.msg import CoreFeedback from ros2_interfaces_pkg.msg import CoreControl -# NEW -from sensor_msgs.msg import Imu, NavSatFix -from geometry_msgs.msg import TwistStamped, Twist serial_pub = None thread = None @@ -41,44 +40,42 @@ control_qos = qos.QoSProfile( liveliness_lease_duration=Duration(seconds=5) ) + class SerialRelay(Node): def __init__(self): # Initalize node with name - super().__init__("core_node")#previously 'serial_publisher' + super().__init__("core_node") - # Get launch mode parameter + # Launch mode -- anchor vs core self.declare_parameter('launch_mode', 'core') self.launch_mode = self.get_parameter('launch_mode').value - self.get_logger().info(f"core launch_mode is: {self.launch_mode}") - - # Create publishers - self.debug_pub = self.create_publisher(String, '/core/debug', 10) - self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) - # Create a subscriber - self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) - - # Create a publisher for telemetry - self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) - - # Create a service server for pinging the rover - self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) + self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") + # Anchor if self.launch_mode == 'anchor': self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) + # Controls + self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) + self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos) + self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) + + # Feedback + self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) self.core_feedback = CoreFeedback() - - - ## NEW TOPICS + self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10) self.imu_state = Imu() self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10) self.gps_state = NavSatFix() - self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos) - self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) + # Debug + self.debug_pub = self.create_publisher(String, '/core/debug', 10) + self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) + + # Core (non-anchor) specific if self.launch_mode == 'core': # Loop through all serial devices on the computer to check for the MCU self.port = None @@ -90,7 +87,7 @@ class SerialRelay(Node): ser = serial.Serial(port, 115200, timeout=1) #(f"Checking port {port}...") ser.write(b"ping\n") - response = ser.read_until("\n") + response = ser.read_until("\n") # type: ignore # if pong is in response, then we are talking with the MCU if b"pong" in response: @@ -111,6 +108,7 @@ class SerialRelay(Node): self.ser = serial.Serial(self.port, 115200) atexit.register(self.cleanup) + # end __init__() def run(self): @@ -118,8 +116,7 @@ class SerialRelay(Node): global thread thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread.start() - - + try: while rclpy.ok(): if self.launch_mode == 'core': @@ -127,7 +124,7 @@ class SerialRelay(Node): except KeyboardInterrupt: sys.exit(0) - def read_MCU(self): + def read_MCU(self): # NON-ANCHOR SPECIFIC try: output = str(self.ser.readline(), "utf8") @@ -138,59 +135,31 @@ class SerialRelay(Node): msg.data = output self.debug_pub.publish(msg) return - # Temporary - - # packet = output.strip().split(',') - - # if len(packet) >= 2 and packet[0] == "core" and packet[1] == "telemetry": - # feedback = CoreFeedback() - # feedback.gpslat = float(packet[2]) - # feedback.gpslon = float(packet[3]) - # feedback.gpssat = float(packet[4]) - # feedback.bnogyr.x = float(packet[5]) - # feedback.bnogyr.y = float(packet[6]) - # feedback.bnogyr.z = float(packet[7]) - # feedback.bnoacc.x = float(packet[8]) - # feedback.bnoacc.y = float(packet[9]) - # feedback.bnoacc.z = float(packet[10]) - # feedback.orient = float(packet[11]) - # feedback.bmptemp = float(packet[12]) - # feedback.bmppres = float(packet[13]) - # feedback.bmpalt = float(packet[14]) - - # self.telemetry_publisher.publish(feedback) - # else: - # # print(f"[MCU] {output}", end="") - # # msg = String() - # # msg.data = output - # # self.debug_pub.publish(msg) - # return except serial.SerialException as e: print(f"SerialException: {e}") print("Closing serial port.") if self.ser.is_open: self.ser.close() - self.exit(1) + sys.exit(1) except TypeError as e: print(f"TypeError: {e}") print("Closing serial port.") if self.ser.is_open: self.ser.close() - self.exit(1) + sys.exit(1) except Exception as e: print(f"Exception: {e}") print("Closing serial port.") if self.ser.is_open: self.ser.close() - self.exit(1) - + sys.exit(1) + def scale_duty(self, value: float, max_speed: float): leftMin = -1 leftMax = 1 rightMin = -max_speed/100.0 rightMax = max_speed/100.0 - # Figure out how 'wide' each range is leftSpan = leftMax - leftMin rightSpan = rightMax - rightMin @@ -202,7 +171,6 @@ class SerialRelay(Node): return str(rightMin + (valueScaled * rightSpan)) def send_controls(self, msg: CoreControl): - #can_relay_tovic,core,19, left_stick, right_stick if(msg.turn_to_enable): command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' else: @@ -260,32 +228,38 @@ class SerialRelay(Node): self.get_logger().info(f"[Core to MCU] {msg}") self.ser.write(bytes(msg, "utf8")) + def anchor_feedback(self, msg: String): output = msg.data parts = str(output.strip()).split(",") - #self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}") - if output.startswith("can_relay_fromvic,core,48"): #GNSS Lattitude + # GNSS Lattitude + if output.startswith("can_relay_fromvic,core,48"): self.core_feedback.gps_lat = float(parts[3]) self.gps_state.latitude = float(parts[3]) - elif output.startswith("can_relay_fromvic,core,49"):#GNSS Longitude + # GNSS Longitude + elif output.startswith("can_relay_fromvic,core,49"): self.core_feedback.gps_long = float(parts[3]) self.gps_state.longitude = float(parts[3]) - elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count + # GNSS Satellite count + elif output.startswith("can_relay_fromvic,core,50"): self.core_feedback.gps_sats = round(float(parts[3])) self.core_feedback.gps_alt = round(float(parts[4]), 2) self.gps_state.altitude = float(parts[3]) self.gps_state.altitude = round(float(parts[4]), 2) - elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z + # Gyro x, y, z + elif output.startswith("can_relay_fromvic,core,51"): self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.imu_calib = round(float(parts[6])) - elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading + # Accel x, y, z, heading + elif output.startswith("can_relay_fromvic,core,52"): self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.z = float(parts[5]) self.core_feedback.orientation = float(parts[6]) - elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback + # REV Sparkmax feedback + elif output.startswith("can_relay_fromvic,core,53"): motorId = round(float(parts[3])) temp = float(parts[4]) / 10.0 voltage = float(parts[5]) / 10.0 @@ -306,12 +280,14 @@ class SerialRelay(Node): self.core_feedback.br_temp = temp self.core_feedback.br_voltage = voltage self.core_feedback.br_current = current - elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100 + # Voltages batt, 12, 5, 3, all * 100 + elif output.startswith("can_relay_fromvic,core,54"): self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.voltage_5 = float(parts[5]) / 100.0 self.core_feedback.voltage_3 = float(parts[6]) / 100.0 - elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure + # BMP Temp, Altitude, Pressure + elif output.startswith("can_relay_fromvic,core,56"): self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.bmp_pres = float(parts[5]) @@ -339,11 +315,11 @@ class SerialRelay(Node): except Exception as e: exit(0) + def myexcepthook(type, value, tb): print("Uncaught exception:", type, value) if serial_pub: serial_pub.cleanup() - def main(args=None): @@ -359,4 +335,3 @@ if __name__ == '__main__': #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly main() - diff --git a/src/headless_pkg/src/headless_node.py b/src/headless_pkg/src/headless_node.py index 35259f2..930f257 100755 --- a/src/headless_pkg/src/headless_node.py +++ b/src/headless_pkg/src/headless_node.py @@ -1,23 +1,23 @@ import rclpy +from rclpy.node import Node from rclpy import qos from rclpy.duration import Duration -from rclpy.node import Node - -import pygame +import signal import time +import atexit import serial +import os import sys import threading import glob -import os -import importlib from std_msgs.msg import String -from ros2_interfaces_pkg.msg import CoreControl, ArmManual from geometry_msgs.msg import Twist +from ros2_interfaces_pkg.msg import CoreControl, ArmManual +import pygame os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() @@ -50,7 +50,7 @@ arm_stop_msg.linear_actuator = 0 arm_stop_msg.laser = 0 ctrl_mode = "core" -CORE_MODE = "duty" # "twist" or "duty" +CORE_MODE = "twist" # "twist" or "duty" control_qos = qos.QoSProfile( history=qos.QoSHistoryPolicy.KEEP_LAST, @@ -260,4 +260,5 @@ def main(args=None): rclpy.shutdown() if __name__ == '__main__': + signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly main()