From 75d1a841bbf2784f8acdc2631eab368dfcface50 Mon Sep 17 00:00:00 2001 From: David Date: Sun, 28 Sep 2025 19:30:40 -0500 Subject: [PATCH] feat: add VicCAN topics to Core, sync Core with PDR --- src/core_pkg/core_pkg/core_node.py | 217 +++++++++++++++++++---------- 1 file changed, 147 insertions(+), 70 deletions(-) diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 24e6ac0..13fbb58 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -13,12 +13,13 @@ import os import sys import threading import glob +from scipy.spatial.transform import Rotation from std_msgs.msg import String -from sensor_msgs.msg import Imu, NavSatFix +from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus from geometry_msgs.msg import TwistStamped, Twist from ros2_interfaces_pkg.msg import CoreControl, CoreFeedback -from ros2_interfaces_pkg.msg import NewCoreFeedback, CoreCtrlState +from ros2_interfaces_pkg.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState serial_pub = None @@ -54,38 +55,69 @@ class SerialRelay(Node): self.launch_mode = self.get_parameter('launch_mode').value self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") + + ################################################## + # Topics + # Anchor if self.launch_mode == 'anchor': + self.anchor_fromvic_sub_ = self.create_subscription(VicCAN, '/anchor/from_vic/core', self.relay_fromvic, 20) + self.anchor_tovic_pub_ = self.create_publisher(VicCAN, '/anchor/to_vic/relay', 20) + self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) - # Controls - self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) + # Control + + # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos) + # manual twist -- [-1, 1] rather than real units self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) + # manual flags -- brake mode and speed mode self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos) self.twist_speed_mode = 0 # -1 = slow, 0 = walk, 1 = fast # Feedback - self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) - self.core_feedback = CoreFeedback() + + # Consolidated and organized core feedback self.feedback_new_pub_ = self.create_publisher(NewCoreFeedback, '/core/feedback_new', 10) self.feedback_new_state = NewCoreFeedback() self.feedback_new_state.fl_motor.id = 1 self.feedback_new_state.bl_motor.id = 2 self.feedback_new_state.fr_motor.id = 3 self.feedback_new_state.br_motor.id = 4 - self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) - self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10) + self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) # TODO: not sure about this + # IMU (embedded BNO-055) + self.imu_pub_ = self.create_publisher(Imu, '/core/imu', 10) self.imu_state = Imu() + self.imu_state.header.frame_id = "core_bno055" + # GPS (embedded u-blox M9N) self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10) self.gps_state = NavSatFix() + self.gps_state.header.frame_id = "core_gps_antenna" + self.gps_state.status.service = NavSatStatus.SERVICE_GPS + self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX + self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN + # Barometer (embedded BMP-388) + self.baro_pub_ = self.create_publisher(Barometer, '/core/baro', 10) + self.baro_state = Barometer() + self.baro_state.header.frame_id = "core_bmp388" + # Old + + # /core/control + self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff + # /core/feedback + self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) + self.core_feedback = CoreFeedback() # Debug self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) + ################################################## + # Find microcontroller (Non-anchor only) + # Core (non-anchor) specific if self.launch_mode == 'core': # Loop through all serial devices on the computer to check for the MCU @@ -206,11 +238,11 @@ class SerialRelay(Node): command = f"can_relay_tovic,core,20,{vel_left_rpm},{vel_right_rpm}\n" self.send_cmd(command) - + def twist_man_callback(self, msg: Twist): linear = msg.linear.x # [-1 1] for forward/back from left stick y angular = msg.angular.z # [-1 1] for left/right from right stick x - + if (linear < 0): # reverse turning direction when going backwards (WIP) angular *= -1 @@ -219,13 +251,13 @@ class SerialRelay(Node): # make it look like a problem and don't just run away lmao linear = 0.25 * (linear / abs(linear)) angular = 0.25 * (angular / abs(angular)) - + duty_left = linear - angular duty_right = linear + angular scale = max(1, abs(duty_left), abs(duty_right)) duty_left /= scale duty_right /= scale - + # Apply speed mode if self.twist_speed_mode == -1: # slow duty_left = map_range(duty_left, -1, 1, -SLOW_SPEED, SLOW_SPEED) @@ -239,7 +271,7 @@ class SerialRelay(Node): command = f"can_relay_tovic,core,19,{duty_left},{duty_right}\n" self.send_cmd(command) - + def control_state_callback(self, msg: CoreCtrlState): # Brake mode command = f"can_relay_tovic,core,18,{str(int(msg.brake_mode))}\n" @@ -262,40 +294,28 @@ class SerialRelay(Node): def anchor_feedback(self, msg: String): output = msg.data parts = str(output.strip()).split(",") - # GNSS Lattitude + # GNSS Latitude if output.startswith("can_relay_fromvic,core,48"): self.core_feedback.gps_lat = float(parts[3]) - self.gps_state.latitude = float(parts[3]) # new # GNSS Longitude elif output.startswith("can_relay_fromvic,core,49"): self.core_feedback.gps_long = float(parts[3]) - self.gps_state.longitude = float(parts[3]) # new # GNSS Satellite count and altitude elif output.startswith("can_relay_fromvic,core,50"): self.core_feedback.gps_sats = round(float(parts[3])) self.core_feedback.gps_alt = round(float(parts[4]), 2) - self.gps_state.altitude = float(parts[3]) # new - self.gps_state.altitude = round(float(parts[4]), 2) # new # Gyro x, y, z, and imu calibration elif output.startswith("can_relay_fromvic,core,51"): self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.imu_calib = round(float(parts[6])) - self.imu_state.angular_velocity.x = float(parts[3]) # new - self.imu_state.angular_velocity.y = float(parts[4]) # new - self.imu_state.angular_velocity.z = float(parts[5]) # new - self.feedback_new_state.imu_calib = round(float(parts[6])) # new # Accel x, y, z, heading elif output.startswith("can_relay_fromvic,core,52"): self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.z = float(parts[5]) self.core_feedback.orientation = float(parts[6]) - self.imu_state.linear_acceleration.x = float(parts[3]) # new - self.imu_state.linear_acceleration.y = float(parts[4]) # new - self.imu_state.linear_acceleration.z = float(parts[5]) # new - self.imu_state.orientation.z = float(parts[6]) # new -- ik this is not correct stfu # REV Sparkmax feedback elif output.startswith("can_relay_fromvic,core,53"): motorId = round(float(parts[3])) @@ -306,79 +326,136 @@ class SerialRelay(Node): self.core_feedback.fl_temp = temp self.core_feedback.fl_voltage = voltage self.core_feedback.fl_current = current - self.feedback_new_state.fl_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.fl_motor.temperature = temp # new - self.feedback_new_state.fl_motor.voltage = voltage # new - self.feedback_new_state.fl_motor.current = current # new elif motorId == 2: self.core_feedback.bl_temp = temp self.core_feedback.bl_voltage = voltage self.core_feedback.bl_current = current - self.feedback_new_state.bl_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.bl_motor.temperature = temp # new - self.feedback_new_state.bl_motor.voltage = voltage # new - self.feedback_new_state.bl_motor.current = current # new elif motorId == 3: self.core_feedback.fr_temp = temp self.core_feedback.fr_voltage = voltage self.core_feedback.fr_current = current - self.feedback_new_state.fr_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.fr_motor.temperature = temp # new - self.feedback_new_state.fr_motor.voltage = voltage # new - self.feedback_new_state.fr_motor.current = current # new elif motorId == 4: self.core_feedback.br_temp = temp self.core_feedback.br_voltage = voltage self.core_feedback.br_current = current - self.feedback_new_state.br_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.br_motor.temperature = temp # new - self.feedback_new_state.br_motor.voltage = voltage # new - self.feedback_new_state.br_motor.current = current # new # Voltages batt, 12, 5, 3, all * 100 elif output.startswith("can_relay_fromvic,core,54"): self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.voltage_5 = float(parts[5]) / 100.0 self.core_feedback.voltage_3 = float(parts[6]) / 100.0 - self.feedback_new_state.voltage_batt = float(parts[3]) / 100.0 # new - self.feedback_new_state.voltage_12 = float(parts[4]) / 100.0 # new - self.feedback_new_state.voltage_5 = float(parts[5]) / 100.0 # new - self.feedback_new_state.voltage_3 = float(parts[6]) / 100.0 # new # BMP temperature, altitude, pressure elif output.startswith("can_relay_fromvic,core,56"): self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.bmp_pres = float(parts[5]) - self.feedback_new_state.bmp_temp = float(parts[3]) # new - self.feedback_new_state.bmp_alt = float(parts[4]) # new - self.feedback_new_state.bmp_pres = float(parts[5]) # new - # REV position and velocity - elif output.startswith("can_relay_fromvic,core,58"): - motorId = round(float(parts[3])) - position = float(parts[4]) - velocity = float(parts[5]) - if motorId == 1: - self.feedback_new_state.fl_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.fl_motor.position = position # new - self.feedback_new_state.fl_motor.velocity = velocity # new - elif motorId == 2: - self.feedback_new_state.bl_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.bl_motor.position = position # new - self.feedback_new_state.bl_motor.velocity = velocity # new - elif motorId == 3: - self.feedback_new_state.fr_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.fr_motor.position = position # new - self.feedback_new_state.fr_motor.velocity = velocity # new - elif motorId == 4: - self.feedback_new_state.br_motor.header.stamp = self.get_clock().now().to_msg() - self.feedback_new_state.br_motor.position = position # new - self.feedback_new_state.br_motor.velocity = velocity # new else: return self.feedback_new_state.header.stamp = self.get_clock().now().to_msg() self.feedback_new_pub_.publish(self.feedback_new_state) #self.get_logger().info(f"[Core Anchor] {msg}") + def relay_fromvic(self, msg: VicCAN): + # Assume that the message is coming from Core + # skill diff if not + + # TODO: add len(msg.data) checks to each feedback message + + # GNSS + if msg.command_id == 48: # GNSS Latitude + self.gps_state.latitude = float(msg.data[0]) + elif msg.command_id == 49: # GNSS Longitude + self.gps_state.longitude = float(msg.data[0]) + elif msg.command_id == 50: # GNSS Satellite count and altitude + self.gps_state.status.status = NavSatStatus.STATUS_FIX if int(msg.data[0]) >= 3 else NavSatStatus.STATUS_NO_FIX + self.gps_state.altitude = float(msg.data[1]) + self.gps_state.header.stamp = msg.header.stamp + self.gps_pub_.publish(self.gps_state) + # IMU + elif msg.command_id == 51: # Gyro x, y, z, and imu calibration + self.feedback_new_state.imu_calib = round(float(msg.data[3])) + self.imu_state.angular_velocity.x = float(msg.data[0]) + self.imu_state.angular_velocity.y = float(msg.data[1]) + self.imu_state.angular_velocity.z = float(msg.data[2]) + self.imu_state.header.stamp = msg.header.stamp + elif msg.command_id == 52: # Accel x, y, z, heading + self.imu_state.linear_acceleration.x = float(msg.data[0]) + self.imu_state.linear_acceleration.y = float(msg.data[1]) + self.imu_state.linear_acceleration.z = float(msg.data[2]) + # Deal with quaternion + r = Rotation.from_euler('z', float(msg.data[3]), degrees=True) + q = r.as_quat() + self.imu_state.orientation.x = q[0] + self.imu_state.orientation.y = q[1] + self.imu_state.orientation.z = q[2] + self.imu_state.orientation.w = q[3] + self.imu_state.header.stamp = msg.header.stamp + self.imu_pub_.publish(self.imu_state) + # REV Motors + elif msg.command_id == 53: # REV SPARK MAX feedback + motorId = round(float(msg.data[0])) + temp = float(msg.data[1]) / 10.0 + voltage = float(msg.data[2]) / 10.0 + current = float(msg.data[3]) / 10.0 + if motorId == 1: + self.feedback_new_state.fl_motor.temperature = temp + self.feedback_new_state.fl_motor.voltage = voltage + self.feedback_new_state.fl_motor.current = current + self.feedback_new_state.fl_motor.header.stamp = msg.header.stamp + elif motorId == 2: + self.feedback_new_state.bl_motor.temperature = temp + self.feedback_new_state.bl_motor.voltage = voltage + self.feedback_new_state.bl_motor.current = current + self.feedback_new_state.bl_motor.header.stamp = msg.header.stamp + elif motorId == 3: + self.feedback_new_state.fr_motor.temperature = temp + self.feedback_new_state.fr_motor.voltage = voltage + self.feedback_new_state.fr_motor.current = current + self.feedback_new_state.fr_motor.header.stamp = msg.header.stamp + elif motorId == 4: + self.feedback_new_state.br_motor.temperature = temp + self.feedback_new_state.br_motor.voltage = voltage + self.feedback_new_state.br_motor.current = current + self.feedback_new_state.br_motor.header.stamp = msg.header.stamp + self.feedback_new_pub_.publish(self.feedback_new_state) + # Board voltage + elif msg.command_id == 54: # Voltages batt, 12, 5, 3, all * 100 + self.feedback_new_state.board_voltage.vbatt = float(msg.data[0]) / 100.0 + self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0 + self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0 + self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0 + # Baro + elif msg.command_id == 56: # BMP temperature, altitude, pressure + self.baro_state.temperature = float(msg.data[0]) + self.baro_state.altitude = float(msg.data[1]) + self.baro_state.pressure = float(msg.data[2]) + self.baro_state.header.stamp = msg.header.stamp + self.baro_pub_.publish(self.baro_state) + # REV Motors (pos and vel) + elif msg.command_id == 58: # REV position and velocity + motorId = round(float(msg.data[0])) + position = float(msg.data[1]) + velocity = float(msg.data[2]) + if motorId == 1: + self.feedback_new_state.fl_motor.position = position + self.feedback_new_state.fl_motor.velocity = velocity + self.feedback_new_state.fl_motor.header.stamp = msg.header.stamp + elif motorId == 2: + self.feedback_new_state.bl_motor.position = position + self.feedback_new_state.bl_motor.velocity = velocity + self.feedback_new_state.bl_motor.header.stamp = msg.header.stamp + elif motorId == 3: + self.feedback_new_state.fr_motor.position = position + self.feedback_new_state.fr_motor.velocity = velocity + self.feedback_new_state.fr_motor.header.stamp = msg.header.stamp + elif motorId == 4: + self.feedback_new_state.br_motor.position = position + self.feedback_new_state.br_motor.velocity = velocity + self.feedback_new_state.br_motor.header.stamp = msg.header.stamp + else: + return + + def publish_feedback(self): #self.get_logger().info(f"[Core] {self.core_feedback}") self.feedback_pub.publish(self.core_feedback)