mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 11:51:16 -05:00
Compare commits
2 Commits
62fd1b110d
...
743744edaa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
743744edaa | ||
|
|
292b3a742d |
@@ -156,7 +156,7 @@ class ArmNode(Node):
|
||||
# Grab velocities from message
|
||||
velocities = [
|
||||
(
|
||||
msg.velocities[msg.joint_names.index(joint_name)]
|
||||
msg.velocities[msg.joint_names.index(joint_name)] # type: ignore
|
||||
if joint_name in msg.joint_names
|
||||
else 0.0
|
||||
)
|
||||
|
||||
@@ -6,23 +6,28 @@ from rclpy.duration import Duration
|
||||
|
||||
import signal
|
||||
import time
|
||||
import atexit
|
||||
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import glob
|
||||
import pwd
|
||||
import grp
|
||||
from math import copysign
|
||||
|
||||
from std_srvs.srv import Trigger
|
||||
from std_msgs.msg import String
|
||||
from std_msgs.msg import Header
|
||||
from geometry_msgs.msg import Twist, TwistStamped
|
||||
from control_msgs.msg import JointJog
|
||||
from astra_msgs.msg import CoreControl, ArmManual, BioControl
|
||||
from astra_msgs.msg import CoreCtrlState
|
||||
|
||||
import warnings
|
||||
|
||||
# Literally headless
|
||||
warnings.filterwarnings(
|
||||
"ignore",
|
||||
message="Your system is avx2 capable but pygame was not built with support for it.",
|
||||
)
|
||||
|
||||
import pygame
|
||||
|
||||
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
|
||||
@@ -34,6 +39,7 @@ os.environ["SDL_AUDIODRIVER"] = (
|
||||
CORE_STOP_MSG = CoreControl() # All zeros by default
|
||||
CORE_STOP_TWIST_MSG = Twist() # "
|
||||
ARM_STOP_MSG = ArmManual() # "
|
||||
ARM_IK_STOP_MSG = TwistStamped() # "
|
||||
BIO_STOP_MSG = BioControl() # "
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
@@ -49,6 +55,7 @@ control_qos = qos.QoSProfile(
|
||||
|
||||
|
||||
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
|
||||
ARM_DEADZONE = float(os.getenv("ARM_DEADZONE", "0.2"))
|
||||
|
||||
|
||||
class Headless(Node):
|
||||
@@ -70,6 +77,14 @@ class Headless(Node):
|
||||
pygame.joystick.init()
|
||||
super().__init__("headless")
|
||||
|
||||
# TODO: move the STOP_MSGs somewhere better
|
||||
global ARM_STOP_JOG_MSG
|
||||
ARM_STOP_JOG_MSG = JointJog(
|
||||
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
|
||||
joint_names=self.all_joint_names,
|
||||
velocities=[0.0] * len(self.all_joint_names),
|
||||
)
|
||||
|
||||
##################################################
|
||||
# Preamble
|
||||
|
||||
@@ -131,24 +146,29 @@ class Headless(Node):
|
||||
self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||
)
|
||||
|
||||
self.declare_parameter("arm_mode", "manual")
|
||||
self.arm_mode = (
|
||||
self.get_parameter("arm_mode").get_parameter_value().string_value
|
||||
self.declare_parameter("use_bio", False)
|
||||
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
|
||||
|
||||
self.declare_parameter("use_arm_ik", False)
|
||||
self.use_arm_ik = (
|
||||
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
|
||||
)
|
||||
|
||||
self.declare_parameter("arm_manual_scheme", "old")
|
||||
self.arm_manual_scheme = (
|
||||
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
|
||||
# NOTE: only applicable if use_old_topics == True
|
||||
self.declare_parameter("use_new_arm_manual_scheme", True)
|
||||
self.use_new_arm_manual_scheme = (
|
||||
self.get_parameter("use_new_arm_manual_scheme")
|
||||
.get_parameter_value()
|
||||
.bool_value
|
||||
)
|
||||
|
||||
# Check parameter validity
|
||||
if self.arm_mode not in ["manual", "ik"]:
|
||||
if self.use_arm_ik and self.use_old_topics:
|
||||
self.get_logger().fatal("Old topics do not support arm IK control.")
|
||||
sys.exit(1)
|
||||
if not self.use_new_arm_manual_scheme and not self.use_old_topics:
|
||||
self.get_logger().warn(
|
||||
f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)."
|
||||
)
|
||||
if self.arm_manual_scheme not in ["old", "new"]:
|
||||
self.get_logger().warn(
|
||||
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
|
||||
f"New arm manual does not support old control scheme. Defaulting to new scheme."
|
||||
)
|
||||
|
||||
self.ctrl_mode = "core" # Start in core mode
|
||||
@@ -168,23 +188,26 @@ class Headless(Node):
|
||||
##################################################
|
||||
# New Topics
|
||||
|
||||
self.core_twist_pub_ = self.create_publisher(
|
||||
Twist, "/core/twist", qos_profile=control_qos
|
||||
)
|
||||
self.core_state_pub_ = self.create_publisher(
|
||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||
)
|
||||
if not self.use_old_topics:
|
||||
self.core_twist_pub_ = self.create_publisher(
|
||||
Twist, "/core/twist", qos_profile=control_qos
|
||||
)
|
||||
self.core_state_pub_ = self.create_publisher(
|
||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||
)
|
||||
|
||||
self.arm_manual_pub_ = self.create_publisher(
|
||||
JointJog, "/arm/manual_new", qos_profile=control_qos
|
||||
)
|
||||
self.arm_manual_pub_ = self.create_publisher(
|
||||
JointJog, "/arm/manual_new", qos_profile=control_qos
|
||||
)
|
||||
|
||||
self.arm_ik_twist_publisher = self.create_publisher(
|
||||
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
|
||||
)
|
||||
self.arm_ik_jointjog_publisher = self.create_publisher(
|
||||
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
|
||||
)
|
||||
self.arm_ik_twist_publisher = self.create_publisher(
|
||||
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
|
||||
)
|
||||
self.arm_ik_jointjog_publisher = self.create_publisher(
|
||||
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
|
||||
)
|
||||
|
||||
# TODO: add new bio topics
|
||||
|
||||
##################################################
|
||||
# Timers
|
||||
@@ -196,7 +219,7 @@ class Headless(Node):
|
||||
|
||||
# If using IK control, we have to "start" the servo node to enable it to accept commands
|
||||
self.servo_start_client = None
|
||||
if self.arm_mode == "ik":
|
||||
if self.use_arm_ik:
|
||||
self.get_logger().info("Starting servo node for IK control...")
|
||||
self.servo_start_client = self.create_client(
|
||||
Trigger, "/servo_node/start_servo"
|
||||
@@ -223,6 +246,11 @@ class Headless(Node):
|
||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
else:
|
||||
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
||||
if self.use_arm_ik:
|
||||
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG)
|
||||
else:
|
||||
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG)
|
||||
# TODO: add bio here after implementing new topics
|
||||
|
||||
def send_controls(self):
|
||||
"""Read the gamepad state and publish control messages"""
|
||||
@@ -255,26 +283,35 @@ class Headless(Node):
|
||||
self.gamepad.rumble(0.6, 0.7, 75)
|
||||
self.ctrl_mode = new_ctrl_mode
|
||||
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
|
||||
if self.ctrl_mode == "arm" and self.use_bio:
|
||||
self.get_logger().warning("NOTE: Using bio instead of arm.")
|
||||
|
||||
# Actually send the controls
|
||||
if self.ctrl_mode == "core":
|
||||
self.send_core()
|
||||
if self.use_old_topics:
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
if self.use_bio:
|
||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
else:
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
# New topics shouldn't need to constantly send zeroes imo
|
||||
else:
|
||||
self.send_arm()
|
||||
# self.send_bio()
|
||||
if self.use_bio:
|
||||
self.send_bio()
|
||||
else:
|
||||
self.send_arm()
|
||||
if self.use_old_topics:
|
||||
self.core_publisher.publish(CORE_STOP_MSG)
|
||||
# Ditto
|
||||
|
||||
def send_core(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
@@ -345,12 +382,12 @@ class Headless(Node):
|
||||
|
||||
def send_arm(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
@@ -360,11 +397,13 @@ class Headless(Node):
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
# OLD MANUAL
|
||||
if self.arm_mode == "manual" and self.use_old_topics:
|
||||
# ==========
|
||||
|
||||
if not self.use_arm_ik and self.use_old_topics:
|
||||
arm_input = ArmManual()
|
||||
|
||||
# OLD ARM CONTROL SCHEME
|
||||
if self.arm_manual_scheme == "old":
|
||||
# OLD ARM MANUAL CONTROL SCHEME
|
||||
if not self.use_new_arm_manual_scheme:
|
||||
# EF Grippers
|
||||
if left_trigger > 0 and right_trigger > 0:
|
||||
arm_input.gripper = 0
|
||||
@@ -407,8 +446,8 @@ class Headless(Node):
|
||||
if abs(right_stick_y) > 0.15:
|
||||
arm_input.axis3 = -1 * round(right_stick_y)
|
||||
|
||||
# NEW ARM CONTROL SCHEME
|
||||
if self.arm_manual_scheme == "new":
|
||||
# NEW ARM MANUAL CONTROL SCHEME
|
||||
if self.use_new_arm_manual_scheme:
|
||||
# Right stick: EF yaw and axis 3
|
||||
# Left stick: axis 1 and 2
|
||||
# D-pad: axis 0 and _
|
||||
@@ -419,28 +458,16 @@ class Headless(Node):
|
||||
# X: _
|
||||
# Y: linear actuator out
|
||||
|
||||
ARM_THRESHOLD = 0.2
|
||||
|
||||
# Right stick: EF yaw and axis 3
|
||||
arm_input.effector_yaw = (
|
||||
0 if abs(right_stick_x) < ARM_THRESHOLD else int(copysign(1, right_stick_x))
|
||||
)
|
||||
arm_input.axis3 = (
|
||||
0 if abs(right_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, right_stick_y))
|
||||
)
|
||||
arm_input.effector_yaw = stick_to_arm_direction(right_stick_x)
|
||||
arm_input.axis3 = -1 * stick_to_arm_direction(right_stick_y)
|
||||
|
||||
# Left stick: axis 1 and 2
|
||||
arm_input.axis1 = (
|
||||
0 if abs(left_stick_x) < ARM_THRESHOLD else int(copysign(1, left_stick_x))
|
||||
)
|
||||
arm_input.axis2 = (
|
||||
0 if abs(left_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, left_stick_y))
|
||||
)
|
||||
arm_input.axis1 = stick_to_arm_direction(left_stick_x)
|
||||
arm_input.axis2 = -1 * stick_to_arm_direction(left_stick_y)
|
||||
|
||||
# D-pad: axis 0 and _
|
||||
arm_input.axis0 = (
|
||||
0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0]))
|
||||
)
|
||||
arm_input.axis0 = int(dpad_input[0])
|
||||
|
||||
# Triggers: EF Grippers
|
||||
if left_trigger > 0 and right_trigger > 0:
|
||||
@@ -473,7 +500,9 @@ class Headless(Node):
|
||||
self.arm_publisher.publish(arm_input)
|
||||
|
||||
# NEW MANUAL
|
||||
elif self.arm_mode == "manual" and not self.use_old_topics:
|
||||
# ==========
|
||||
|
||||
elif not self.use_arm_ik and not self.use_old_topics:
|
||||
arm_input = JointJog()
|
||||
arm_input.header.frame_id = "base_link"
|
||||
arm_input.header.stamp = self.get_clock().now().to_msg()
|
||||
@@ -493,36 +522,44 @@ class Headless(Node):
|
||||
ARM_THRESHOLD = 0.2
|
||||
|
||||
# Right stick: EF yaw and axis 3
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = (
|
||||
float(copysign(1, right_stick_x)) if abs(right_stick_x) >= ARM_THRESHOLD else 0.0
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
|
||||
stick_to_arm_direction(right_stick_x)
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = (
|
||||
float(-1 * copysign(1, right_stick_y)) if abs(right_stick_y) >= ARM_THRESHOLD else 0.0
|
||||
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
|
||||
-1 * stick_to_arm_direction(right_stick_y)
|
||||
)
|
||||
|
||||
# Left stick: axis 1 and 2
|
||||
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = (
|
||||
float(copysign(1, left_stick_x)) if abs(left_stick_x) >= ARM_THRESHOLD else 0.0
|
||||
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = float(
|
||||
stick_to_arm_direction(left_stick_x)
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = (
|
||||
float(-1 * copysign(1, left_stick_y)) if abs(left_stick_y) >= ARM_THRESHOLD else 0.0
|
||||
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
|
||||
-1 * stick_to_arm_direction(left_stick_y)
|
||||
)
|
||||
|
||||
# D-pad: axis 0 and _
|
||||
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = (
|
||||
float(copysign(1, dpad_input[0])) if dpad_input[0] != 0 else 0.0
|
||||
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = float(
|
||||
dpad_input[0]
|
||||
)
|
||||
|
||||
# Triggers: EF Grippers
|
||||
if left_trigger > 0 and right_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 0.0
|
||||
arm_input.velocities[
|
||||
self.all_joint_names.index("ef_gripper_left_joint")
|
||||
] = 0.0
|
||||
elif left_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = -1.0
|
||||
arm_input.velocities[
|
||||
self.all_joint_names.index("ef_gripper_left_joint")
|
||||
] = -1.0
|
||||
elif right_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 1.0
|
||||
arm_input.velocities[
|
||||
self.all_joint_names.index("ef_gripper_left_joint")
|
||||
] = 1.0
|
||||
|
||||
# Bumpers: EF roll
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = right_bumper - left_bumper
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = (
|
||||
right_bumper - left_bumper
|
||||
)
|
||||
|
||||
# A: brake
|
||||
# TODO: Brake mode
|
||||
@@ -532,8 +569,10 @@ class Headless(Node):
|
||||
|
||||
self.arm_manual_pub_.publish(arm_input)
|
||||
|
||||
# IK
|
||||
elif self.arm_mode == "ik":
|
||||
# IK (ONLY NEW)
|
||||
# =============
|
||||
|
||||
elif self.use_arm_ik:
|
||||
arm_twist = TwistStamped()
|
||||
arm_twist.header.frame_id = "base_link"
|
||||
arm_twist.header.stamp = self.get_clock().now().to_msg()
|
||||
@@ -567,7 +606,7 @@ class Headless(Node):
|
||||
|
||||
# Triggers: EF Grippers
|
||||
if left_trigger > 0 or right_trigger > 0:
|
||||
arm_jointjog.joint_names.append("ef_gripper_left_joint")
|
||||
arm_jointjog.joint_names.append("ef_gripper_left_joint") # type: ignore
|
||||
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
|
||||
|
||||
# Bumpers: angular x
|
||||
@@ -583,12 +622,12 @@ class Headless(Node):
|
||||
|
||||
def send_bio(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
@@ -604,21 +643,31 @@ class Headless(Node):
|
||||
)
|
||||
|
||||
# Drill motor (FAERIE)
|
||||
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
|
||||
if left_trigger > 0 or right_trigger > 0:
|
||||
bio_input.drill = int(
|
||||
30 * (right_trigger - left_trigger)
|
||||
) # Max duty cycle 30%
|
||||
|
||||
self.bio_publisher.publish(bio_input)
|
||||
|
||||
else:
|
||||
pass # TODO: implement new bio control topics
|
||||
|
||||
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||
|
||||
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
||||
if abs(value) < threshold:
|
||||
return 0
|
||||
return value
|
||||
|
||||
|
||||
def stick_to_arm_direction(value: float, threshold=ARM_DEADZONE) -> int:
|
||||
"""Apply a larger deadzone to a stick input and make digital/binary instead of analog"""
|
||||
if abs(value) < threshold:
|
||||
return 0
|
||||
return int(copysign(1, value))
|
||||
|
||||
|
||||
def is_user_in_group(group_name: str) -> bool:
|
||||
# Copied from https://zetcode.com/python/os-getgrouplist/
|
||||
try:
|
||||
@@ -637,10 +686,19 @@ def is_user_in_group(group_name: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def exit_handler(signum, frame):
|
||||
print("Caught SIGTERM. Exiting...")
|
||||
rclpy.try_shutdown()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
try:
|
||||
rclpy.init(args=args)
|
||||
|
||||
# Catch termination signals and exit cleanly
|
||||
signal.signal(signal.SIGTERM, exit_handler)
|
||||
|
||||
node = Headless()
|
||||
rclpy.spin(node)
|
||||
except (KeyboardInterrupt, ExternalShutdownException):
|
||||
@@ -650,7 +708,4 @@ def main(args=None):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
signal.signal(
|
||||
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
||||
) # Catch termination signals and exit cleanly
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user