feat: add VicCAN topics to Core, sync Core with PDR

This commit is contained in:
David
2025-09-28 19:30:40 -05:00
parent 7992acf60f
commit 75d1a841bb

View File

@@ -13,12 +13,13 @@ import os
import sys import sys
import threading import threading
import glob import glob
from scipy.spatial.transform import Rotation
from std_msgs.msg import String from std_msgs.msg import String
from sensor_msgs.msg import Imu, NavSatFix from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus
from geometry_msgs.msg import TwistStamped, Twist from geometry_msgs.msg import TwistStamped, Twist
from ros2_interfaces_pkg.msg import CoreControl, CoreFeedback from ros2_interfaces_pkg.msg import CoreControl, CoreFeedback
from ros2_interfaces_pkg.msg import NewCoreFeedback, CoreCtrlState from ros2_interfaces_pkg.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState
serial_pub = None serial_pub = None
@@ -54,38 +55,69 @@ class SerialRelay(Node):
self.launch_mode = self.get_parameter('launch_mode').value self.launch_mode = self.get_parameter('launch_mode').value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
##################################################
# Topics
# Anchor # Anchor
if self.launch_mode == 'anchor': if self.launch_mode == 'anchor':
self.anchor_fromvic_sub_ = self.create_subscription(VicCAN, '/anchor/from_vic/core', self.relay_fromvic, 20)
self.anchor_tovic_pub_ = self.create_publisher(VicCAN, '/anchor/to_vic/relay', 20)
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
# Controls # Control
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos) self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos)
# manual twist -- [-1, 1] rather than real units
self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos)
# manual flags -- brake mode and speed mode
self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos) self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos)
self.twist_speed_mode = 0 # -1 = slow, 0 = walk, 1 = fast self.twist_speed_mode = 0 # -1 = slow, 0 = walk, 1 = fast
# Feedback # Feedback
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
self.core_feedback = CoreFeedback() # Consolidated and organized core feedback
self.feedback_new_pub_ = self.create_publisher(NewCoreFeedback, '/core/feedback_new', 10) self.feedback_new_pub_ = self.create_publisher(NewCoreFeedback, '/core/feedback_new', 10)
self.feedback_new_state = NewCoreFeedback() self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1 self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2 self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3 self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4 self.feedback_new_state.br_motor.id = 4
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) # TODO: not sure about this
self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10) # IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(Imu, '/core/imu', 10)
self.imu_state = Imu() self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055"
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10) self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10)
self.gps_state = NavSatFix() self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(Barometer, '/core/baro', 10)
self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388"
# Old
# /core/control
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
self.core_feedback = CoreFeedback()
# Debug # Debug
self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
##################################################
# Find microcontroller (Non-anchor only)
# Core (non-anchor) specific # Core (non-anchor) specific
if self.launch_mode == 'core': if self.launch_mode == 'core':
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
@@ -262,40 +294,28 @@ class SerialRelay(Node):
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
# GNSS Lattitude # GNSS Latitude
if output.startswith("can_relay_fromvic,core,48"): if output.startswith("can_relay_fromvic,core,48"):
self.core_feedback.gps_lat = float(parts[3]) self.core_feedback.gps_lat = float(parts[3])
self.gps_state.latitude = float(parts[3]) # new
# GNSS Longitude # GNSS Longitude
elif output.startswith("can_relay_fromvic,core,49"): elif output.startswith("can_relay_fromvic,core,49"):
self.core_feedback.gps_long = float(parts[3]) self.core_feedback.gps_long = float(parts[3])
self.gps_state.longitude = float(parts[3]) # new
# GNSS Satellite count and altitude # GNSS Satellite count and altitude
elif output.startswith("can_relay_fromvic,core,50"): elif output.startswith("can_relay_fromvic,core,50"):
self.core_feedback.gps_sats = round(float(parts[3])) self.core_feedback.gps_sats = round(float(parts[3]))
self.core_feedback.gps_alt = round(float(parts[4]), 2) self.core_feedback.gps_alt = round(float(parts[4]), 2)
self.gps_state.altitude = float(parts[3]) # new
self.gps_state.altitude = round(float(parts[4]), 2) # new
# Gyro x, y, z, and imu calibration # Gyro x, y, z, and imu calibration
elif output.startswith("can_relay_fromvic,core,51"): elif output.startswith("can_relay_fromvic,core,51"):
self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6])) self.core_feedback.imu_calib = round(float(parts[6]))
self.imu_state.angular_velocity.x = float(parts[3]) # new
self.imu_state.angular_velocity.y = float(parts[4]) # new
self.imu_state.angular_velocity.z = float(parts[5]) # new
self.feedback_new_state.imu_calib = round(float(parts[6])) # new
# Accel x, y, z, heading # Accel x, y, z, heading
elif output.startswith("can_relay_fromvic,core,52"): elif output.startswith("can_relay_fromvic,core,52"):
self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bno_accel.z = float(parts[5]) self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.orientation = float(parts[6]) self.core_feedback.orientation = float(parts[6])
self.imu_state.linear_acceleration.x = float(parts[3]) # new
self.imu_state.linear_acceleration.y = float(parts[4]) # new
self.imu_state.linear_acceleration.z = float(parts[5]) # new
self.imu_state.orientation.z = float(parts[6]) # new -- ik this is not correct stfu
# REV Sparkmax feedback # REV Sparkmax feedback
elif output.startswith("can_relay_fromvic,core,53"): elif output.startswith("can_relay_fromvic,core,53"):
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
@@ -306,79 +326,136 @@ class SerialRelay(Node):
self.core_feedback.fl_temp = temp self.core_feedback.fl_temp = temp
self.core_feedback.fl_voltage = voltage self.core_feedback.fl_voltage = voltage
self.core_feedback.fl_current = current self.core_feedback.fl_current = current
self.feedback_new_state.fl_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.fl_motor.temperature = temp # new
self.feedback_new_state.fl_motor.voltage = voltage # new
self.feedback_new_state.fl_motor.current = current # new
elif motorId == 2: elif motorId == 2:
self.core_feedback.bl_temp = temp self.core_feedback.bl_temp = temp
self.core_feedback.bl_voltage = voltage self.core_feedback.bl_voltage = voltage
self.core_feedback.bl_current = current self.core_feedback.bl_current = current
self.feedback_new_state.bl_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.bl_motor.temperature = temp # new
self.feedback_new_state.bl_motor.voltage = voltage # new
self.feedback_new_state.bl_motor.current = current # new
elif motorId == 3: elif motorId == 3:
self.core_feedback.fr_temp = temp self.core_feedback.fr_temp = temp
self.core_feedback.fr_voltage = voltage self.core_feedback.fr_voltage = voltage
self.core_feedback.fr_current = current self.core_feedback.fr_current = current
self.feedback_new_state.fr_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.fr_motor.temperature = temp # new
self.feedback_new_state.fr_motor.voltage = voltage # new
self.feedback_new_state.fr_motor.current = current # new
elif motorId == 4: elif motorId == 4:
self.core_feedback.br_temp = temp self.core_feedback.br_temp = temp
self.core_feedback.br_voltage = voltage self.core_feedback.br_voltage = voltage
self.core_feedback.br_current = current self.core_feedback.br_current = current
self.feedback_new_state.br_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.br_motor.temperature = temp # new
self.feedback_new_state.br_motor.voltage = voltage # new
self.feedback_new_state.br_motor.current = current # new
# Voltages batt, 12, 5, 3, all * 100 # Voltages batt, 12, 5, 3, all * 100
elif output.startswith("can_relay_fromvic,core,54"): elif output.startswith("can_relay_fromvic,core,54"):
self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.bat_voltage = float(parts[3]) / 100.0
self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0 self.core_feedback.voltage_5 = float(parts[5]) / 100.0
self.core_feedback.voltage_3 = float(parts[6]) / 100.0 self.core_feedback.voltage_3 = float(parts[6]) / 100.0
self.feedback_new_state.voltage_batt = float(parts[3]) / 100.0 # new
self.feedback_new_state.voltage_12 = float(parts[4]) / 100.0 # new
self.feedback_new_state.voltage_5 = float(parts[5]) / 100.0 # new
self.feedback_new_state.voltage_3 = float(parts[6]) / 100.0 # new
# BMP temperature, altitude, pressure # BMP temperature, altitude, pressure
elif output.startswith("can_relay_fromvic,core,56"): elif output.startswith("can_relay_fromvic,core,56"):
self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bmp_temp = float(parts[3])
self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5]) self.core_feedback.bmp_pres = float(parts[5])
self.feedback_new_state.bmp_temp = float(parts[3]) # new
self.feedback_new_state.bmp_alt = float(parts[4]) # new
self.feedback_new_state.bmp_pres = float(parts[5]) # new
# REV position and velocity
elif output.startswith("can_relay_fromvic,core,58"):
motorId = round(float(parts[3]))
position = float(parts[4])
velocity = float(parts[5])
if motorId == 1:
self.feedback_new_state.fl_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.fl_motor.position = position # new
self.feedback_new_state.fl_motor.velocity = velocity # new
elif motorId == 2:
self.feedback_new_state.bl_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.bl_motor.position = position # new
self.feedback_new_state.bl_motor.velocity = velocity # new
elif motorId == 3:
self.feedback_new_state.fr_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.fr_motor.position = position # new
self.feedback_new_state.fr_motor.velocity = velocity # new
elif motorId == 4:
self.feedback_new_state.br_motor.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_state.br_motor.position = position # new
self.feedback_new_state.br_motor.velocity = velocity # new
else: else:
return return
self.feedback_new_state.header.stamp = self.get_clock().now().to_msg() self.feedback_new_state.header.stamp = self.get_clock().now().to_msg()
self.feedback_new_pub_.publish(self.feedback_new_state) self.feedback_new_pub_.publish(self.feedback_new_state)
#self.get_logger().info(f"[Core Anchor] {msg}") #self.get_logger().info(f"[Core Anchor] {msg}")
def relay_fromvic(self, msg: VicCAN):
# Assume that the message is coming from Core
# skill diff if not
# TODO: add len(msg.data) checks to each feedback message
# GNSS
if msg.command_id == 48: # GNSS Latitude
self.gps_state.latitude = float(msg.data[0])
elif msg.command_id == 49: # GNSS Longitude
self.gps_state.longitude = float(msg.data[0])
elif msg.command_id == 50: # GNSS Satellite count and altitude
self.gps_state.status.status = NavSatStatus.STATUS_FIX if int(msg.data[0]) >= 3 else NavSatStatus.STATUS_NO_FIX
self.gps_state.altitude = float(msg.data[1])
self.gps_state.header.stamp = msg.header.stamp
self.gps_pub_.publish(self.gps_state)
# IMU
elif msg.command_id == 51: # Gyro x, y, z, and imu calibration
self.feedback_new_state.imu_calib = round(float(msg.data[3]))
self.imu_state.angular_velocity.x = float(msg.data[0])
self.imu_state.angular_velocity.y = float(msg.data[1])
self.imu_state.angular_velocity.z = float(msg.data[2])
self.imu_state.header.stamp = msg.header.stamp
elif msg.command_id == 52: # Accel x, y, z, heading
self.imu_state.linear_acceleration.x = float(msg.data[0])
self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2])
# Deal with quaternion
r = Rotation.from_euler('z', float(msg.data[3]), degrees=True)
q = r.as_quat()
self.imu_state.orientation.x = q[0]
self.imu_state.orientation.y = q[1]
self.imu_state.orientation.z = q[2]
self.imu_state.orientation.w = q[3]
self.imu_state.header.stamp = msg.header.stamp
self.imu_pub_.publish(self.imu_state)
# REV Motors
elif msg.command_id == 53: # REV SPARK MAX feedback
motorId = round(float(msg.data[0]))
temp = float(msg.data[1]) / 10.0
voltage = float(msg.data[2]) / 10.0
current = float(msg.data[3]) / 10.0
if motorId == 1:
self.feedback_new_state.fl_motor.temperature = temp
self.feedback_new_state.fl_motor.voltage = voltage
self.feedback_new_state.fl_motor.current = current
self.feedback_new_state.fl_motor.header.stamp = msg.header.stamp
elif motorId == 2:
self.feedback_new_state.bl_motor.temperature = temp
self.feedback_new_state.bl_motor.voltage = voltage
self.feedback_new_state.bl_motor.current = current
self.feedback_new_state.bl_motor.header.stamp = msg.header.stamp
elif motorId == 3:
self.feedback_new_state.fr_motor.temperature = temp
self.feedback_new_state.fr_motor.voltage = voltage
self.feedback_new_state.fr_motor.current = current
self.feedback_new_state.fr_motor.header.stamp = msg.header.stamp
elif motorId == 4:
self.feedback_new_state.br_motor.temperature = temp
self.feedback_new_state.br_motor.voltage = voltage
self.feedback_new_state.br_motor.current = current
self.feedback_new_state.br_motor.header.stamp = msg.header.stamp
self.feedback_new_pub_.publish(self.feedback_new_state)
# Board voltage
elif msg.command_id == 54: # Voltages batt, 12, 5, 3, all * 100
self.feedback_new_state.board_voltage.vbatt = float(msg.data[0]) / 100.0
self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0
self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0
self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0
# Baro
elif msg.command_id == 56: # BMP temperature, altitude, pressure
self.baro_state.temperature = float(msg.data[0])
self.baro_state.altitude = float(msg.data[1])
self.baro_state.pressure = float(msg.data[2])
self.baro_state.header.stamp = msg.header.stamp
self.baro_pub_.publish(self.baro_state)
# REV Motors (pos and vel)
elif msg.command_id == 58: # REV position and velocity
motorId = round(float(msg.data[0]))
position = float(msg.data[1])
velocity = float(msg.data[2])
if motorId == 1:
self.feedback_new_state.fl_motor.position = position
self.feedback_new_state.fl_motor.velocity = velocity
self.feedback_new_state.fl_motor.header.stamp = msg.header.stamp
elif motorId == 2:
self.feedback_new_state.bl_motor.position = position
self.feedback_new_state.bl_motor.velocity = velocity
self.feedback_new_state.bl_motor.header.stamp = msg.header.stamp
elif motorId == 3:
self.feedback_new_state.fr_motor.position = position
self.feedback_new_state.fr_motor.velocity = velocity
self.feedback_new_state.fr_motor.header.stamp = msg.header.stamp
elif motorId == 4:
self.feedback_new_state.br_motor.position = position
self.feedback_new_state.br_motor.velocity = velocity
self.feedback_new_state.br_motor.header.stamp = msg.header.stamp
else:
return
def publish_feedback(self): def publish_feedback(self):
#self.get_logger().info(f"[Core] {self.core_feedback}") #self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback) self.feedback_pub.publish(self.core_feedback)