feat: add Twist support to headless and core_pkg

Left stick y is forward/backward, right stick x is turn/angular. Still uses duty cycle. Published on /core/twist. On headless, needs CORE_MODE constant changed to "twist" to use new Twist command.

Also added TwistStamped to core_pkg for /cmd_vel. Will use velocity control. Needs wheel measurements from Core CAD. Will be used for Autonomy.
This commit is contained in:
David
2025-09-08 09:51:25 -05:00
parent 056ffd1eb6
commit 0c1a368499
2 changed files with 405 additions and 330 deletions

View File

@@ -1,329 +1,358 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos from rclpy import qos
from std_srvs.srv import Empty from rclpy.duration import Duration
from std_srvs.srv import Empty
import signal
import time import signal
import atexit import time
import atexit
import serial
import os import serial
import sys import os
import threading import sys
import glob import threading
import glob
from std_msgs.msg import String from math import sqrt
from ros2_interfaces_pkg.msg import CoreFeedback
from ros2_interfaces_pkg.msg import CoreControl from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreFeedback
# NEW from ros2_interfaces_pkg.msg import CoreControl
from sensor_msgs.msg import Imu, NavSatFix
from geometry_msgs.msg import TwistStamped # NEW
from sensor_msgs.msg import Imu, NavSatFix
serial_pub = None from geometry_msgs.msg import TwistStamped, Twist
thread = None
serial_pub = None
control_qos = qos.QoSProfile( thread = None
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, CORE_WHEELBASE = 0.34 # meters -- TODO: verify
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, CORE_WHEEL_RADIUS = 0.075 # meters -- TODO: verify
durability=qos.QoSDurabilityPolicy.VOLATILE, CORE_GEAR_RATIO = 100 # Clucky: 100:1, Testbed: 64:1
deadline=1000,
lifespan=500, control_qos = qos.QoSProfile(
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, history=qos.QoSHistoryPolicy.KEEP_LAST,
liveliness_lease_duration=5000 depth=2,
) reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
class SerialRelay(Node): deadline=Duration(seconds=1),
def __init__(self): lifespan=Duration(nanoseconds=500_000_000), # 500ms
# Initalize node with name liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
super().__init__("core_node")#previously 'serial_publisher' liveliness_lease_duration=Duration(seconds=5)
)
# Get launch mode parameter
self.declare_parameter('launch_mode', 'core') class SerialRelay(Node):
self.launch_mode = self.get_parameter('launch_mode').value def __init__(self):
self.get_logger().info(f"core launch_mode is: {self.launch_mode}") # Initalize node with name
super().__init__("core_node")#previously 'serial_publisher'
# Create publishers
self.debug_pub = self.create_publisher(String, '/core/debug', 10) # Get launch mode parameter
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) self.declare_parameter('launch_mode', 'core')
# Create a subscriber self.launch_mode = self.get_parameter('launch_mode').value
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) self.get_logger().info(f"core launch_mode is: {self.launch_mode}")
# Create a publisher for telemetry # Create publishers
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
# Create a service server for pinging the rover # Create a subscriber
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
if self.launch_mode == 'anchor': # Create a publisher for telemetry
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
# Create a service server for pinging the rover
self.core_feedback = CoreFeedback() self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
if self.launch_mode == 'anchor':
## NEW TOPICS self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.imu_state = Imu()
self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10) self.core_feedback = CoreFeedback()
self.gps_state = NavSatFix()
self.twist_sub_ = self.create_subscription(TwistStamped, '/twist', self.twist_callback, qos_profile=control_qos) ## NEW TOPICS
self.imu_pub_ = self.create_publisher(Imu, '/imu/data', 10)
self.imu_state = Imu()
if self.launch_mode == 'core': self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', 10)
# Loop through all serial devices on the computer to check for the MCU self.gps_state = NavSatFix()
self.port = None
ports = SerialRelay.list_serial_ports() self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos)
for i in range(2): self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos)
for port in ports:
try: if self.launch_mode == 'core':
# connect and send a ping command # Loop through all serial devices on the computer to check for the MCU
ser = serial.Serial(port, 115200, timeout=1) self.port = None
#(f"Checking port {port}...") ports = SerialRelay.list_serial_ports()
ser.write(b"ping\n") for i in range(2):
response = ser.read_until("\n") for port in ports:
try:
# if pong is in response, then we are talking with the MCU # connect and send a ping command
if b"pong" in response: ser = serial.Serial(port, 115200, timeout=1)
self.port = port #(f"Checking port {port}...")
self.get_logger().info(f"Found MCU at {self.port}!") ser.write(b"ping\n")
self.get_logger().info(f"Enabling Relay Mode") response = ser.read_until("\n")
ser.write(b"can_relay_mode,on\n")
break # if pong is in response, then we are talking with the MCU
except: if b"pong" in response:
pass self.port = port
if self.port is not None: self.get_logger().info(f"Found MCU at {self.port}!")
break self.get_logger().info(f"Enabling Relay Mode")
ser.write(b"can_relay_mode,on\n")
if self.port is None: break
self.get_logger().info("Unable to find MCU...") except:
time.sleep(1) pass
sys.exit(1) if self.port is not None:
break
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
def run(self): sys.exit(1)
# This thread makes all the update processes run in the background
global thread self.ser = serial.Serial(self.port, 115200)
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) atexit.register(self.cleanup)
thread.start()
def run(self):
try: # This thread makes all the update processes run in the background
while rclpy.ok(): global thread
if self.launch_mode == 'core': thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
self.read_MCU() # Check the MCU for updates thread.start()
except KeyboardInterrupt:
sys.exit(0)
try:
def read_MCU(self): while rclpy.ok():
try: if self.launch_mode == 'core':
output = str(self.ser.readline(), "utf8") self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
if output: sys.exit(0)
# All output over debug temporarily
print(f"[MCU] {output}") def read_MCU(self):
msg = String() try:
msg.data = output output = str(self.ser.readline(), "utf8")
self.debug_pub.publish(msg)
return if output:
# Temporary # All output over debug temporarily
print(f"[MCU] {output}")
# packet = output.strip().split(',') msg = String()
msg.data = output
# if len(packet) >= 2 and packet[0] == "core" and packet[1] == "telemetry": self.debug_pub.publish(msg)
# feedback = CoreFeedback() return
# feedback.gpslat = float(packet[2]) # Temporary
# feedback.gpslon = float(packet[3])
# feedback.gpssat = float(packet[4]) # packet = output.strip().split(',')
# feedback.bnogyr.x = float(packet[5])
# feedback.bnogyr.y = float(packet[6]) # if len(packet) >= 2 and packet[0] == "core" and packet[1] == "telemetry":
# feedback.bnogyr.z = float(packet[7]) # feedback = CoreFeedback()
# feedback.bnoacc.x = float(packet[8]) # feedback.gpslat = float(packet[2])
# feedback.bnoacc.y = float(packet[9]) # feedback.gpslon = float(packet[3])
# feedback.bnoacc.z = float(packet[10]) # feedback.gpssat = float(packet[4])
# feedback.orient = float(packet[11]) # feedback.bnogyr.x = float(packet[5])
# feedback.bmptemp = float(packet[12]) # feedback.bnogyr.y = float(packet[6])
# feedback.bmppres = float(packet[13]) # feedback.bnogyr.z = float(packet[7])
# feedback.bmpalt = float(packet[14]) # feedback.bnoacc.x = float(packet[8])
# feedback.bnoacc.y = float(packet[9])
# self.telemetry_publisher.publish(feedback) # feedback.bnoacc.z = float(packet[10])
# else: # feedback.orient = float(packet[11])
# # print(f"[MCU] {output}", end="") # feedback.bmptemp = float(packet[12])
# # msg = String() # feedback.bmppres = float(packet[13])
# # msg.data = output # feedback.bmpalt = float(packet[14])
# # self.debug_pub.publish(msg)
# return # self.telemetry_publisher.publish(feedback)
except serial.SerialException as e: # else:
print(f"SerialException: {e}") # # print(f"[MCU] {output}", end="")
print("Closing serial port.") # # msg = String()
if self.ser.is_open: # # msg.data = output
self.ser.close() # # self.debug_pub.publish(msg)
self.exit(1) # return
except TypeError as e: except serial.SerialException as e:
print(f"TypeError: {e}") print(f"SerialException: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) self.exit(1)
except Exception as e: except TypeError as e:
print(f"Exception: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) self.exit(1)
except Exception as e:
def scale_duty(self, value: float, max_speed: float): print(f"Exception: {e}")
leftMin = -1 print("Closing serial port.")
leftMax = 1 if self.ser.is_open:
rightMin = -max_speed/100.0 self.ser.close()
rightMax = max_speed/100.0 self.exit(1)
def scale_duty(self, value: float, max_speed: float):
# Figure out how 'wide' each range is leftMin = -1
leftSpan = leftMax - leftMin leftMax = 1
rightSpan = rightMax - rightMin rightMin = -max_speed/100.0
rightMax = max_speed/100.0
# Convert the left range into a 0-1 range (float)
valueScaled = float(value - leftMin) / float(leftSpan)
# Figure out how 'wide' each range is
# Convert the 0-1 range into a value in the right range. leftSpan = leftMax - leftMin
return str(rightMin + (valueScaled * rightSpan)) rightSpan = rightMax - rightMin
def send_controls(self, msg: CoreControl): # Convert the left range into a 0-1 range (float)
#can_relay_tovic,core,19, left_stick, right_stick valueScaled = float(value - leftMin) / float(leftSpan)
if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' # Convert the 0-1 range into a value in the right range.
else: return str(rightMin + (valueScaled * rightSpan))
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
self.send_cmd(command) def send_controls(self, msg: CoreControl):
#can_relay_tovic,core,19, left_stick, right_stick
# Brake mode if(msg.turn_to_enable):
command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n' command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
self.send_cmd(command) else:
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
#print(f"[Sys] Relaying: {command}") self.send_cmd(command)
def twist_callback(self, msg: TwistStamped): # Brake mode
linear = msg.twist.linear.x command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n'
angular = msg.twist.angular.z self.send_cmd(command)
# send that bitch straight to embedded (hope we're running through core rn...) #print(f"[Sys] Relaying: {command}")
command = f"joystick_ctrl,{angular},{linear}\n"
self.send_cmd(command) def cmd_vel_callback(self, msg: TwistStamped):
linear = msg.twist.linear.x
def send_cmd(self, msg: str): angular = msg.twist.angular.z
if self.launch_mode == 'anchor':
#self.get_logger().info(f"[Core to Anchor Relay] {msg}") vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
output = String()#Convert to std_msg string vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
output.data = msg
self.anchor_pub.publish(output) vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
elif self.launch_mode == 'core': vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
self.get_logger().info(f"[Core to MCU] {msg.data}")
self.ser.write(bytes(msg, "utf8")) command = f"can_relay_tovic,core,20,{vel_left_rpm},{vel_right_rpm}\n"
self.send_cmd(command)
def anchor_feedback(self, msg: String):
output = msg.data def twist_man_callback(self, msg: Twist):
parts = str(output.strip()).split(",") linear = msg.linear.x # [-1 1] for forward/back from left joy y
#self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}") angular = -msg.angular.z # [-1 1] for left/right from right joy x
if output.startswith("can_relay_fromvic,core,48"): #GNSS Lattitude
self.core_feedback.gps_lat = float(parts[3]) if abs(linear) > 1 or abs(angular) > 1:
self.gps_state.latitude = float(parts[3]) # if speed is greater than 1, then there is a problem
elif output.startswith("can_relay_fromvic,core,49"):#GNSS Longitude # make it look like a problem and don't just run away lmao
self.core_feedback.gps_long = float(parts[3]) drive_speed = 0.25
self.gps_state.longitude = float(parts[3])
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count duty_left = linear - angular
self.core_feedback.gps_sats = round(float(parts[3])) duty_right = linear + angular
self.core_feedback.gps_alt = round(float(parts[4]), 2) scale = max(1, abs(duty_left), abs(duty_right))
self.gps_state.altitude = float(parts[3]) duty_left /= scale
self.gps_state.altitude = round(float(parts[4]), 2) duty_right /= scale
elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z
self.core_feedback.bno_gyro.x = float(parts[3]) command = f"can_relay_tovic,core,19,{duty_left},{duty_right}\n"
self.core_feedback.bno_gyro.y = float(parts[4]) self.send_cmd(command)
self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6])) def send_cmd(self, msg: str):
elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading if self.launch_mode == 'anchor':
self.core_feedback.bno_accel.x = float(parts[3]) #self.get_logger().info(f"[Core to Anchor Relay] {msg}")
self.core_feedback.bno_accel.y = float(parts[4]) output = String()#Convert to std_msg string
self.core_feedback.bno_accel.z = float(parts[5]) output.data = msg
self.core_feedback.orientation = float(parts[6]) self.anchor_pub.publish(output)
elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback elif self.launch_mode == 'core':
motorId = round(float(parts[3])) self.get_logger().info(f"[Core to MCU] {msg.data}")
temp = float(parts[4]) / 10.0 self.ser.write(bytes(msg, "utf8"))
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0 def anchor_feedback(self, msg: String):
if motorId == 1: output = msg.data
self.core_feedback.fl_temp = temp parts = str(output.strip()).split(",")
self.core_feedback.fl_voltage = voltage #self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}")
self.core_feedback.fl_current = current if output.startswith("can_relay_fromvic,core,48"): #GNSS Lattitude
elif motorId == 2: self.core_feedback.gps_lat = float(parts[3])
self.core_feedback.bl_temp = temp self.gps_state.latitude = float(parts[3])
self.core_feedback.bl_voltage = voltage elif output.startswith("can_relay_fromvic,core,49"):#GNSS Longitude
self.core_feedback.bl_current = current self.core_feedback.gps_long = float(parts[3])
elif motorId == 3: self.gps_state.longitude = float(parts[3])
self.core_feedback.fr_temp = temp elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count
self.core_feedback.fr_voltage = voltage self.core_feedback.gps_sats = round(float(parts[3]))
self.core_feedback.fr_current = current self.core_feedback.gps_alt = round(float(parts[4]), 2)
elif motorId == 4: self.gps_state.altitude = float(parts[3])
self.core_feedback.br_temp = temp self.gps_state.altitude = round(float(parts[4]), 2)
self.core_feedback.br_voltage = voltage elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z
self.core_feedback.br_current = current self.core_feedback.bno_gyro.x = float(parts[3])
elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100 self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.imu_calib = round(float(parts[6]))
self.core_feedback.voltage_5 = float(parts[5]) / 100.0 elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading
self.core_feedback.voltage_3 = float(parts[6]) / 100.0 self.core_feedback.bno_accel.x = float(parts[3])
elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.orientation = float(parts[6])
self.core_feedback.bmp_pres = float(parts[5]) elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback
else: motorId = round(float(parts[3]))
return temp = float(parts[4]) / 10.0
#self.get_logger().info(f"[Core Anchor] {msg}") voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
def publish_feedback(self): if motorId == 1:
#self.get_logger().info(f"[Core] {self.core_feedback}") self.core_feedback.fl_temp = temp
self.feedback_pub.publish(self.core_feedback) self.core_feedback.fl_voltage = voltage
self.core_feedback.fl_current = current
def ping_callback(self, request, response): elif motorId == 2:
return response self.core_feedback.bl_temp = temp
self.core_feedback.bl_voltage = voltage
self.core_feedback.bl_current = current
@staticmethod elif motorId == 3:
def list_serial_ports(): self.core_feedback.fr_temp = temp
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") self.core_feedback.fr_voltage = voltage
self.core_feedback.fr_current = current
def cleanup(self): elif motorId == 4:
print("Cleaning up before terminating...") self.core_feedback.br_temp = temp
try: self.core_feedback.br_voltage = voltage
if self.ser.is_open: self.core_feedback.br_current = current
self.ser.close() elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100
except Exception as e: self.core_feedback.bat_voltage = float(parts[3]) / 100.0
exit(0) self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0
def myexcepthook(type, value, tb): self.core_feedback.voltage_3 = float(parts[6]) / 100.0
print("Uncaught exception:", type, value) elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure
if serial_pub: self.core_feedback.bmp_temp = float(parts[3])
serial_pub.cleanup() self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5])
else:
return
def main(args=None): #self.get_logger().info(f"[Core Anchor] {msg}")
rclpy.init(args=args)
sys.excepthook = myexcepthook def publish_feedback(self):
#self.get_logger().info(f"[Core] {self.core_feedback}")
global serial_pub self.feedback_pub.publish(self.core_feedback)
serial_pub = SerialRelay() def ping_callback(self, request, response):
serial_pub.run() return response
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly @staticmethod
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly def list_serial_ports():
main() return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -1,4 +1,6 @@
import rclpy import rclpy
from rclpy import qos
from rclpy.duration import Duration
from rclpy.node import Node from rclpy.node import Node
import pygame import pygame
@@ -14,6 +16,7 @@ import os
import importlib import importlib
from std_msgs.msg import String from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreControl, ArmManual from ros2_interfaces_pkg.msg import CoreControl, ArmManual
from geometry_msgs.msg import Twist
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
@@ -27,6 +30,14 @@ core_stop_msg.left_stick = 0.0
core_stop_msg.right_stick = 0.0 core_stop_msg.right_stick = 0.0
core_stop_msg.max_speed = 0 core_stop_msg.max_speed = 0
core_stop_twist_msg = Twist()
core_stop_twist_msg.linear.x = 0.0
core_stop_twist_msg.linear.y = 0.0
core_stop_twist_msg.linear.z = 0.0
core_stop_twist_msg.angular.x = 0.0
core_stop_twist_msg.angular.y = 0.0
core_stop_twist_msg.angular.z = 0.0
arm_stop_msg = ArmManual() arm_stop_msg = ArmManual()
arm_stop_msg.axis0 = 0 arm_stop_msg.axis0 = 0
arm_stop_msg.axis1 = 0 arm_stop_msg.axis1 = 0
@@ -39,6 +50,18 @@ arm_stop_msg.linear_actuator = 0
arm_stop_msg.laser = 0 arm_stop_msg.laser = 0
ctrl_mode = "core" ctrl_mode = "core"
CORE_MODE = "duty" # "twist" or "duty"
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5)
)
class Headless(Node): class Headless(Node):
@@ -69,6 +92,7 @@ class Headless(Node):
self.create_timer(0.15, self.send_controls) self.create_timer(0.15, self.send_controls)
self.core_publisher = self.create_publisher(CoreControl, '/core/control', 10) self.core_publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.arm_publisher = self.create_publisher(ArmManual, '/arm/control/manual', 10) self.arm_publisher = self.create_publisher(ArmManual, '/arm/control/manual', 10)
self.core_twist_pub_ = self.create_publisher(Twist, '/core/twist', qos_profile=control_qos)
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
@@ -111,7 +135,7 @@ class Headless(Node):
# CORE # CORE
if ctrl_mode == "core": if ctrl_mode == "core" and CORE_MODE == "duty":
input = CoreControl() input = CoreControl()
input.max_speed = max_speed input.max_speed = max_speed
@@ -137,6 +161,28 @@ class Headless(Node):
self.core_publisher.publish(input) self.core_publisher.publish(input)
self.arm_publisher.publish(arm_stop_msg) self.arm_publisher.publish(arm_stop_msg)
elif ctrl_mode == "core" and CORE_MODE == "twist":
input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
# Forward/back
input.linear.x = float(round(-1 * left_stick_y, 2))
# Turn
input.angular.z = float(round(right_stick_x, 2))
# Debug
output = f'Lin X: {input.linear.x}, Ang Z: {input.angular.z}'
self.get_logger().info(f"[Ctrl] {output}")
self.core_twist_pub_.publish(input)
self.arm_publisher.publish(arm_stop_msg)
# ARM # ARM