mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 03:41:17 -05:00
cmd_vel headless
This commit is contained in:
@@ -17,7 +17,8 @@ import grp
|
||||
from math import copysign
|
||||
|
||||
from std_msgs.msg import String
|
||||
from geometry_msgs.msg import Twist
|
||||
from geometry_msgs.msg import Twist, TwistStamped
|
||||
from std_msgs.msg import Header
|
||||
from astra_msgs.msg import CoreControl, ArmManual, BioControl
|
||||
from astra_msgs.msg import CoreCtrlState
|
||||
|
||||
@@ -55,6 +56,16 @@ class Headless(Node):
|
||||
pygame.joystick.init()
|
||||
super().__init__("headless")
|
||||
|
||||
self.declare_parameter("use_cmd_vel", False)
|
||||
self.using_cmd_vel = self.get_parameter("use_cmd_vel").value
|
||||
if self.using_cmd_vel:
|
||||
self.get_logger().info("Using cmd_vel for core control")
|
||||
global CORE_MODE
|
||||
CORE_MODE = "twist"
|
||||
else:
|
||||
self.get_logger().info("Using astra_msgs/CoreControl for core control")
|
||||
|
||||
|
||||
# Wait for anchor to start
|
||||
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
|
||||
while len(pub_info) == 0:
|
||||
@@ -114,6 +125,9 @@ class Headless(Node):
|
||||
self.core_twist_pub_ = self.create_publisher(
|
||||
Twist, "/core/twist", qos_profile=control_qos
|
||||
)
|
||||
self.core_cmd_vel_pub_ = self.create_publisher(
|
||||
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
|
||||
)
|
||||
self.core_state_pub_ = self.create_publisher(
|
||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||
)
|
||||
@@ -125,6 +139,9 @@ class Headless(Node):
|
||||
# Rumble when node is ready (returns False if rumble not supported)
|
||||
self.gamepad.rumble(0.7, 0.8, 150)
|
||||
|
||||
# Added so you can tell when it starts running after changing the constant logging to debug from info
|
||||
self.get_logger().info("Defaulting to Core mode. Ready.")
|
||||
|
||||
def run(self):
|
||||
# This thread makes all the update processes run in the background
|
||||
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
|
||||
@@ -198,7 +215,7 @@ class Headless(Node):
|
||||
# self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
|
||||
elif self.ctrl_mode == "core" and CORE_MODE == "twist":
|
||||
input = Twist()
|
||||
twist = Twist()
|
||||
|
||||
# Collect controller state
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
@@ -208,17 +225,25 @@ class Headless(Node):
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
|
||||
# Forward/back and Turn
|
||||
input.linear.x = -1.0 * left_stick_y
|
||||
input.angular.z = -1.0 * copysign(
|
||||
twist.linear.x = -1.0 * left_stick_y
|
||||
twist.angular.z = -1.0 * copysign(
|
||||
right_stick_x**2, right_stick_x
|
||||
) # Exponent for finer control (curve)
|
||||
|
||||
if self.using_cmd_vel:
|
||||
twist.linear.x *= 1.5
|
||||
twist.angular.z *= 0.5
|
||||
|
||||
# Publish
|
||||
self.core_twist_pub_.publish(input)
|
||||
if not self.using_cmd_vel:
|
||||
self.core_twist_pub_.publish(twist)
|
||||
else:
|
||||
header = Header(stamp=self.get_clock().now().to_msg())
|
||||
self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist))
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
# self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
self.get_logger().info(
|
||||
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
|
||||
self.get_logger().debug(
|
||||
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
|
||||
)
|
||||
|
||||
# Brake mode
|
||||
|
||||
Reference in New Issue
Block a user