mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
feat: add speed and brake mode control
Speed mode is set with bumpers, brake mode is set with 'A' button. Implemented through /core/control/state topic with custom message CoreCtrlState. Also added Core measurements from Gabe, and did some general refactoring.
This commit is contained in:
@@ -13,22 +13,25 @@ import os
|
||||
import sys
|
||||
import threading
|
||||
import glob
|
||||
from math import sqrt
|
||||
|
||||
from std_msgs.msg import String
|
||||
from sensor_msgs.msg import Imu, NavSatFix
|
||||
from geometry_msgs.msg import TwistStamped, Twist
|
||||
from ros2_interfaces_pkg.msg import CoreFeedback
|
||||
from ros2_interfaces_pkg.msg import CoreControl
|
||||
from ros2_interfaces_pkg.msg import CoreControl, CoreFeedback
|
||||
from ros2_interfaces_pkg.msg import NewCoreFeedback, CoreCtrlState
|
||||
|
||||
|
||||
serial_pub = None
|
||||
thread = None
|
||||
|
||||
CORE_WHEELBASE = 0.84 # meters -- TODO: verify
|
||||
CORE_WHEEL_RADIUS = 0.015 # meters -- TODO: verify
|
||||
CORE_WHEELBASE = 0.836 # meters
|
||||
CORE_WHEEL_RADIUS = 0.171 # meters
|
||||
CORE_GEAR_RATIO = 100 # Clucky: 100:1, Testbed: 64:1
|
||||
|
||||
WALKING_SPEED = 0.5 # duty cycle
|
||||
FAST_SPEED = 0.9 # duty cycle
|
||||
SLOW_SPEED = 0.25 # duty cycle
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
@@ -60,6 +63,8 @@ class SerialRelay(Node):
|
||||
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
|
||||
self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, qos_profile=control_qos)
|
||||
self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos)
|
||||
self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos)
|
||||
self.twist_speed_mode = 0 # -1 = slow, 0 = walk, 1 = fast
|
||||
|
||||
# Feedback
|
||||
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
|
||||
@@ -197,10 +202,10 @@ class SerialRelay(Node):
|
||||
self.send_cmd(command)
|
||||
|
||||
def twist_man_callback(self, msg: Twist):
|
||||
linear = msg.linear.x # [-1 1] for forward/back from left joy y
|
||||
angular = -msg.angular.z # [-1 1] for left/right from right joy x
|
||||
linear = msg.linear.x # [-1 1] for forward/back from left stick y
|
||||
angular = msg.angular.z # [-1 1] for left/right from right stick x
|
||||
|
||||
if (linear < 0): # reverse turning direction when going backwards
|
||||
if (linear < 0): # reverse turning direction when going backwards (WIP)
|
||||
angular *= -1
|
||||
|
||||
if abs(linear) > 1 or abs(angular) > 1:
|
||||
@@ -215,9 +220,28 @@ class SerialRelay(Node):
|
||||
duty_left /= scale
|
||||
duty_right /= scale
|
||||
|
||||
# Apply speed mode
|
||||
if self.twist_speed_mode == -1: # slow
|
||||
duty_left = map_range(duty_left, -1, 1, -SLOW_SPEED, SLOW_SPEED)
|
||||
duty_right = map_range(duty_right, -1, 1, -SLOW_SPEED, SLOW_SPEED)
|
||||
elif self.twist_speed_mode == 0: # walk
|
||||
duty_left = map_range(duty_left, -1, 1, -WALKING_SPEED, WALKING_SPEED)
|
||||
duty_right = map_range(duty_right, -1, 1, -WALKING_SPEED, WALKING_SPEED)
|
||||
elif self.twist_speed_mode == 1: # fast
|
||||
duty_left = map_range(duty_left, -1, 1, -FAST_SPEED, FAST_SPEED)
|
||||
duty_right = map_range(duty_right, -1, 1, -FAST_SPEED, FAST_SPEED)
|
||||
|
||||
command = f"can_relay_tovic,core,19,{duty_left},{duty_right}\n"
|
||||
self.send_cmd(command)
|
||||
|
||||
def control_state_callback(self, msg: CoreCtrlState):
|
||||
# Brake mode
|
||||
command = f"can_relay_tovic,core,18,{str(int(msg.brake_mode))}\n"
|
||||
self.send_cmd(command)
|
||||
|
||||
# Speed mode
|
||||
self.twist_speed_mode = msg.speed_mode # twist_man_callback will handle this
|
||||
|
||||
def send_cmd(self, msg: str):
|
||||
if self.launch_mode == 'anchor':
|
||||
#self.get_logger().info(f"[Core to Anchor Relay] {msg}")
|
||||
@@ -321,6 +345,9 @@ def myexcepthook(type, value, tb):
|
||||
if serial_pub:
|
||||
serial_pub.cleanup()
|
||||
|
||||
def map_range(value: float, in_min: float, in_max: float, out_min: float, out_max: float):
|
||||
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
||||
@@ -16,6 +16,7 @@ import glob
|
||||
from std_msgs.msg import String
|
||||
from geometry_msgs.msg import Twist
|
||||
from ros2_interfaces_pkg.msg import CoreControl, ArmManual
|
||||
from ros2_interfaces_pkg.msg import CoreCtrlState
|
||||
|
||||
import pygame
|
||||
|
||||
@@ -49,9 +50,6 @@ arm_stop_msg.gripper = 0
|
||||
arm_stop_msg.linear_actuator = 0
|
||||
arm_stop_msg.laser = 0
|
||||
|
||||
ctrl_mode = "core"
|
||||
CORE_MODE = "twist" # "twist" or "duty"
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
@@ -63,6 +61,12 @@ control_qos = qos.QoSProfile(
|
||||
liveliness_lease_duration=Duration(seconds=5)
|
||||
)
|
||||
|
||||
ctrl_mode = "core"
|
||||
CORE_MODE = "twist" # "twist" or "duty"
|
||||
|
||||
core_brake_mode = False
|
||||
core_speed_mode = 0 # -1 = slow, 0 = walk, 1 = fast
|
||||
|
||||
|
||||
class Headless(Node):
|
||||
def __init__(self):
|
||||
@@ -93,6 +97,7 @@ class Headless(Node):
|
||||
self.core_publisher = self.create_publisher(CoreControl, '/core/control', 10)
|
||||
self.arm_publisher = self.create_publisher(ArmManual, '/arm/control/manual', 10)
|
||||
self.core_twist_pub_ = self.create_publisher(Twist, '/core/twist', qos_profile=control_qos)
|
||||
self.core_state_pub_ = self.create_publisher(CoreCtrlState, '/core/control/state', qos_profile=control_qos)
|
||||
|
||||
def run(self):
|
||||
# This thread makes all the update processes run in the background
|
||||
@@ -168,21 +173,42 @@ class Headless(Node):
|
||||
# Collect controller state
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
|
||||
# Forward/back and Turn
|
||||
input.linear.x = -left_stick_y
|
||||
input.angular.z = -right_stick_x
|
||||
|
||||
# Forward/back
|
||||
input.linear.x = float(round(-1 * left_stick_y, 2))
|
||||
|
||||
# Turn
|
||||
input.angular.z = float(round(right_stick_x, 2))
|
||||
|
||||
|
||||
# Debug
|
||||
output = f'Lin X: {input.linear.x}, Ang Z: {input.angular.z}'
|
||||
self.get_logger().info(f"[Ctrl] {output}")
|
||||
|
||||
# Publish
|
||||
self.core_twist_pub_.publish(input)
|
||||
self.arm_publisher.publish(arm_stop_msg)
|
||||
self.get_logger().info(f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}")
|
||||
|
||||
# State (brake and speed)
|
||||
global core_brake_mode
|
||||
global core_speed_mode
|
||||
|
||||
# Brake mode
|
||||
new_brake_mode = button_a
|
||||
# Speed mode
|
||||
if left_bumper:
|
||||
new_speed_mode = -1
|
||||
elif right_bumper:
|
||||
new_speed_mode = 1
|
||||
else:
|
||||
new_speed_mode = 0
|
||||
|
||||
# Only publish if needed
|
||||
if new_brake_mode != core_brake_mode or new_speed_mode != core_speed_mode:
|
||||
core_brake_mode = new_brake_mode
|
||||
core_speed_mode = new_speed_mode
|
||||
state_msg = CoreCtrlState()
|
||||
state_msg.brake_mode = core_brake_mode
|
||||
state_msg.speed_mode = core_speed_mode
|
||||
self.core_state_pub_.publish(state_msg)
|
||||
self.get_logger().info(f"[Core State] Brake: {core_brake_mode}, Speed: {core_speed_mode}")
|
||||
|
||||
|
||||
# ARM
|
||||
|
||||
Submodule src/ros2_interfaces_pkg updated: 7ae068d570...821cdddee0
Reference in New Issue
Block a user