mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
add core feedback functionality (ANCHOR)
This commit is contained in:
@@ -33,6 +33,9 @@ class SerialRelay(Node):
|
|||||||
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
|
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
|
||||||
# Create a subscriber
|
# Create a subscriber
|
||||||
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
|
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
|
||||||
|
|
||||||
|
# Create a publisher for telemetry
|
||||||
|
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
|
||||||
|
|
||||||
# Create a service server for pinging the rover
|
# Create a service server for pinging the rover
|
||||||
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
|
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
|
||||||
@@ -41,6 +44,8 @@ class SerialRelay(Node):
|
|||||||
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
|
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
|
||||||
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
|
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
|
||||||
|
|
||||||
|
self.core_feedback = CoreFeedback()
|
||||||
|
|
||||||
|
|
||||||
if self.launch_mode == 'core':
|
if self.launch_mode == 'core':
|
||||||
# Loop through all serial devices on the computer to check for the MCU
|
# Loop through all serial devices on the computer to check for the MCU
|
||||||
@@ -167,10 +172,6 @@ class SerialRelay(Node):
|
|||||||
def send_controls(self, msg):
|
def send_controls(self, msg):
|
||||||
#can_relay_tovic,core,19, left_stick, right_stick
|
#can_relay_tovic,core,19, left_stick, right_stick
|
||||||
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
|
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
|
||||||
#print(f"[Sys] {command}", end="")
|
|
||||||
|
|
||||||
#self.ser.write(bytes(command, "utf8"))# Send command to MCU
|
|
||||||
#self.get_logger().debug(f"wrote: {command}")
|
|
||||||
|
|
||||||
self.send_cmd(command)
|
self.send_cmd(command)
|
||||||
|
|
||||||
@@ -186,7 +187,43 @@ class SerialRelay(Node):
|
|||||||
self.ser.write(bytes(msg, "utf8"))
|
self.ser.write(bytes(msg, "utf8"))
|
||||||
|
|
||||||
def anchor_feedback(self, msg):
|
def anchor_feedback(self, msg):
|
||||||
self.get_logger().info(f"[Core Anchor] {msg}")
|
output = msg.data
|
||||||
|
parts = str(output.strip()).split(",")
|
||||||
|
#self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}")
|
||||||
|
if output.startswith("can_relay_fromvic,core,48"): #GNSS Lattitude
|
||||||
|
self.core_feedback.gps_lat = float(parts[3])
|
||||||
|
elif output.startswith("can_relay_fromvic,core,49"):#GNSS Longitude
|
||||||
|
self.core_feedback.gps_long = float(parts[3])
|
||||||
|
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count
|
||||||
|
self.core_feedback.gps_sats = round(float(parts[3]))
|
||||||
|
elif output.startswith("can_relay_fromvic,core,51"):#Gyro x,y,z
|
||||||
|
self.core_feedback.bno_gyro.x = float(parts[3])
|
||||||
|
self.core_feedback.bno_gyro.y = float(parts[4])
|
||||||
|
self.core_feedback.bno_gyro.z = float(parts[5])
|
||||||
|
elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
|
||||||
|
self.core_feedback.bno_accel.x = float(parts[3])
|
||||||
|
self.core_feedback.bno_accel.y = float(parts[4])
|
||||||
|
self.core_feedback.bno_accel.z = float(parts[5])
|
||||||
|
self.core_feedback.orientation = float(parts[6]) / 10.0
|
||||||
|
elif output.startswith("can_relay_fromvic,core,53"):#Rev motor feedback
|
||||||
|
pass
|
||||||
|
#self.updateMotorFeedback(output)
|
||||||
|
elif output.startswith("can_relay_fromvic,core,54"):#bat, 12, 5, 3, Voltage readings * 100
|
||||||
|
self.core_feedback.bat_voltage = float(parts[3]) / 100.0
|
||||||
|
self.core_feedback.voltage_12 = float(parts[4]) / 100.0
|
||||||
|
self.core_feedback.voltage_5 = float(parts[5]) / 100.0
|
||||||
|
self.core_feedback.voltage_3 = float(parts[6]) / 100.0
|
||||||
|
elif output.startswith("can_relay_fromvic,core,56"):#BMP Temp, Altitude, Pressure
|
||||||
|
self.core_feedback.bmp_temp = float(parts[3])
|
||||||
|
self.core_feedback.bmp_alt = float(parts[4])
|
||||||
|
self.core_feedback.bmp_pres = float(parts[5])
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
#self.get_logger().info(f"[Core Anchor] {msg}")
|
||||||
|
|
||||||
|
def publish_feedback(self):
|
||||||
|
#self.get_logger().info(f"[Core] {self.core_feedback}")
|
||||||
|
self.feedback_pub.publish(self.core_feedback)
|
||||||
|
|
||||||
def ping_callback(self, request, response):
|
def ping_callback(self, request, response):
|
||||||
return response
|
return response
|
||||||
|
|||||||
Reference in New Issue
Block a user