From caa5a637bb30039b8dab8ce3649327af4474311a Mon Sep 17 00:00:00 2001 From: David Date: Mon, 19 Jan 2026 11:54:00 -0600 Subject: [PATCH] feat: (headless) add arm IK control support Only Twist-based so far, no JointJog. need to figure out which frame to send commands in. Currently `base_link`. --- src/astra_descriptions | 2 +- src/headless_pkg/src/headless_node.py | 160 ++++++++++++++---- .../src/joystick_twist.cpp | 46 ++--- 3 files changed, 152 insertions(+), 56 deletions(-) diff --git a/src/astra_descriptions b/src/astra_descriptions index 29a3eea..078ec91 160000 --- a/src/astra_descriptions +++ b/src/astra_descriptions @@ -1 +1 @@ -Subproject commit 29a3eea9c53c97d7c01c092f4d727e14788066b3 +Subproject commit 078ec913d7dcb96b936c42a17dd1d5f9f167e111 diff --git a/src/headless_pkg/src/headless_node.py b/src/headless_pkg/src/headless_node.py index 8703bed..7228afc 100755 --- a/src/headless_pkg/src/headless_node.py +++ b/src/headless_pkg/src/headless_node.py @@ -16,8 +16,10 @@ import pwd import grp from math import copysign +from std_srvs.srv import Trigger from std_msgs.msg import String -from geometry_msgs.msg import Twist +from geometry_msgs.msg import Twist, TwistStamped +from control_msgs.msg import JointJog from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreCtrlState @@ -31,8 +33,8 @@ os.environ["SDL_AUDIODRIVER"] = ( CORE_STOP_MSG = CoreControl() # All zeros by default CORE_STOP_TWIST_MSG = Twist() # " -ARM_STOP_MSG = ArmManual() # " -BIO_STOP_MSG = BioControl() # " +ARM_STOP_MSG = ArmManual() # " +BIO_STOP_MSG = BioControl() # " control_qos = qos.QoSProfile( history=qos.QoSHistoryPolicy.KEEP_LAST, @@ -45,6 +47,13 @@ control_qos = qos.QoSProfile( liveliness_lease_duration=Duration(seconds=5), ) +arm_ik_qos = qos.QoSProfile( + history=qos.QoSHistoryPolicy.KEEP_LAST, + depth=1, + reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, + durability=qos.QoSDurabilityPolicy.VOLATILE, +) + CORE_MODE = "twist" # "twist" or "duty" STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05")) @@ -107,10 +116,16 @@ class Headless(Node): break id += 1 - self.create_timer(0.15, self.send_controls) + self.create_timer(0.1, self.send_controls) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) + self.arm_ik_twist_publisher = self.create_publisher( + TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos + ) + self.arm_ik_jointjog_publisher = self.create_publisher( + JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos + ) self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) self.core_twist_pub_ = self.create_publisher( @@ -120,12 +135,40 @@ class Headless(Node): CoreCtrlState, "/core/control/state", qos_profile=control_qos ) - self.declare_parameter('arm_manual_scheme', "old") - self.arm_manual_scheme = self.get_parameter('arm_manual_scheme').value + self.declare_parameter("arm_mode", "manual") + self.arm_mode = self.get_parameter("arm_mode").value + + self.declare_parameter("arm_manual_scheme", "old") + self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value # Check parameter validity + if self.arm_mode not in ["manual", "ik"]: + self.get_logger().warn( + f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)." + ) if self.arm_manual_scheme not in ["old", "new"]: - self.get_logger().warn(f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls).") + self.get_logger().warn( + f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)." + ) + + # If using IK control, we have to "start" the servo node to enable it to accept commands + self.servo_start_client = None + if self.arm_mode == "ik": + self.get_logger().info("Starting servo node for IK control...") + self.servo_start_client = self.create_client( + Trigger, "/servo_node/start_servo" + ) + timeout_counter = 0 + while not self.servo_start_client.wait_for_service(timeout_sec=1.0): + self.get_logger().info("Waiting for servo_node/start_servo service...") + timeout_counter += 1 + if timeout_counter >= 10: + self.get_logger().error( + "Servo's start service not available. IK control will not work." + ) + break + if self.servo_start_client.service_is_ready(): + self.servo_start_client.call_async(Trigger.Request()) self.ctrl_mode = "core" # Start in core mode self.core_brake_mode = False @@ -134,18 +177,6 @@ class Headless(Node): # Rumble when node is ready (returns False if rumble not supported) self.gamepad.rumble(0.7, 0.8, 150) - def run(self): - # This thread makes all the update processes run in the background - thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) - thread.start() - - try: - while rclpy.ok(): - self.send_controls() - time.sleep(0.1) # Small delay to avoid CPU hogging - except KeyboardInterrupt: - sys.exit(0) - def send_controls(self): """Read the gamepad state and publish control messages""" for event in pygame.event.get(): @@ -255,7 +286,7 @@ class Headless(Node): ) # ARM and BIO - if self.ctrl_mode == "arm": + if self.ctrl_mode == "arm" and self.arm_mode == "manual": arm_input = ArmManual() # Collect controller state @@ -315,7 +346,7 @@ class Headless(Node): # Axis 3 if abs(right_stick_y) > 0.15: arm_input.axis3 = -1 * round(right_stick_y) - + if self.arm_manual_scheme == "new": # Right stick: EF yaw and axis 3 # Left stick: axis 1 and 2 @@ -326,26 +357,26 @@ class Headless(Node): # B: _ # X: _ # Y: linear actuator - + # Right stick: EF yaw and axis 3 - arm_input.effector_yaw = 0 if right_stick_x == 0 else int( - copysign(1, right_stick_x) + arm_input.effector_yaw = ( + 0 if right_stick_x == 0 else int(copysign(1, right_stick_x)) ) - arm_input.axis3 = 0 if right_stick_y == 0 else int( - copysign(-1, right_stick_y) + arm_input.axis3 = ( + 0 if right_stick_y == 0 else int(copysign(-1, right_stick_y)) ) # Left stick: axis 1 and 2 - arm_input.axis1 = 0 if left_stick_x == 0 else int( - copysign(1, left_stick_x) + arm_input.axis1 = ( + 0 if left_stick_x == 0 else int(copysign(1, left_stick_x)) ) - arm_input.axis2 = 0 if left_stick_y == 0 else int( - copysign(-1, left_stick_y) + arm_input.axis2 = ( + 0 if left_stick_y == 0 else int(copysign(-1, left_stick_y)) ) # D-pad: axis 0 and _ - arm_input.axis0 = 0 if dpad_input[0] == 0 else int( - copysign(1, dpad_input[0]) + arm_input.axis0 = ( + 0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0])) ) # Triggers: EF Grippers @@ -386,6 +417,71 @@ class Headless(Node): self.arm_publisher.publish(arm_input) + if self.ctrl_mode == "arm" and self.arm_mode == "ik": + arm_twist = TwistStamped() + arm_twist.header.frame_id = "base_link" + arm_twist.header.stamp = self.get_clock().now().to_msg() + arm_jointjog = JointJog() + arm_jointjog.header.frame_id = "base_link" + arm_jointjog.header.stamp = self.get_clock().now().to_msg() + + # Collect controller state + left_stick_x = deadzone(self.gamepad.get_axis(0)) + left_stick_y = deadzone(self.gamepad.get_axis(1)) + left_trigger = deadzone(self.gamepad.get_axis(2)) + right_stick_x = deadzone(self.gamepad.get_axis(3)) + right_stick_y = deadzone(self.gamepad.get_axis(4)) + right_trigger = deadzone(self.gamepad.get_axis(5)) + button_a = self.gamepad.get_button(0) + button_b = self.gamepad.get_button(1) + button_x = self.gamepad.get_button(2) + button_y = self.gamepad.get_button(3) + left_bumper = self.gamepad.get_button(4) + right_bumper = self.gamepad.get_button(5) + dpad_input = self.gamepad.get_hat(0) + + # Right stick: linear y and linear x + # Left stick: angular z and linear z + # D-pad: angular y and _ + # Triggers: EF grippers + # Bumpers: angular x + # A: brake + # B: IK mode + # X: manual mode + # Y: linear actuator + + # Right stick: linear y and linear x + arm_twist.twist.linear.y = float(right_stick_x) + arm_twist.twist.linear.x = float(right_stick_y) + + # Left stick: angular z and linear z + arm_twist.twist.angular.z = float(-1 * left_stick_x) + arm_twist.twist.linear.z = float(-1 * left_stick_y) + # D-pad: angular y and _ + arm_twist.twist.angular.y = ( + float(0) + if dpad_input[0] == 0 + else float(-1 * copysign(0.75, dpad_input[0])) + ) + + # Triggers: EF Grippers + if left_trigger > 0 or right_trigger > 0: + arm_jointjog.joint_names.append( + "Gripper_Slider_Left" # TODO: Update joint name + ) + arm_jointjog.velocities.append(float(right_trigger - left_trigger)) + + # Bumpers: angular x + if left_bumper > 0 and right_bumper > 0: + arm_twist.twist.angular.x = float(0) + elif left_bumper > 0: + arm_twist.twist.angular.x = float(1) + elif right_bumper > 0: + arm_twist.twist.angular.x = float(-1) + + self.arm_ik_twist_publisher.publish(arm_twist) + # self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out + def deadzone(value: float, threshold=STICK_DEADZONE) -> float: """Apply a deadzone to a joystick input so the motors don't sound angry""" diff --git a/src/servo_arm_twist_pkg/src/joystick_twist.cpp b/src/servo_arm_twist_pkg/src/joystick_twist.cpp index 5a117d0..e098851 100644 --- a/src/servo_arm_twist_pkg/src/joystick_twist.cpp +++ b/src/servo_arm_twist_pkg/src/joystick_twist.cpp @@ -107,23 +107,23 @@ bool convertJoyToCmd(const std::vector& axes, const std::vector& but std::unique_ptr& twist, std::unique_ptr& joint) { - // // Give joint jogging priority because it is only buttons - // // If any joint jog command is requested, we are only publishing joint commands - // if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y]) - // { - // // Map the D_PAD to the proximal joints - // joint->joint_names.push_back("panda_joint1"); - // joint->velocities.push_back(axes[D_PAD_X]); - // joint->joint_names.push_back("panda_joint2"); - // joint->velocities.push_back(axes[D_PAD_Y]); + // Give joint jogging priority because it is only buttons + // If any joint jog command is requested, we are only publishing joint commands + if (0) + { + // Map the D_PAD to the proximal joints + joint->joint_names.push_back("panda_joint1"); + joint->velocities.push_back(axes[D_PAD_X]); + joint->joint_names.push_back("panda_joint2"); + joint->velocities.push_back(axes[D_PAD_Y]); - // // Map the diamond to the distal joints - // joint->joint_names.push_back("panda_joint7"); - // joint->velocities.push_back(buttons[B] - buttons[X]); - // joint->joint_names.push_back("panda_joint6"); - // joint->velocities.push_back(buttons[Y] - buttons[A]); - // return false; - // } + // Map the diamond to the distal joints + joint->joint_names.push_back("panda_joint7"); + joint->velocities.push_back(buttons[B] - buttons[X]); + joint->joint_names.push_back("panda_joint6"); + joint->velocities.push_back(buttons[Y] - buttons[A]); + return false; + } // The bread and butter: map buttons to twist commands twist->twist.linear.z = axes[RIGHT_STICK_Y]; @@ -243,13 +243,13 @@ public: twist_msg->header.stamp = this->now(); twist_pub_->publish(std::move(twist_msg)); } - // else - // { - // // publish the JointJog - // joint_msg->header.stamp = this->now(); - // joint_msg->header.frame_id = "panda_link3"; - // joint_pub_->publish(std::move(joint_msg)); - // } + else + { + // publish the JointJog + joint_msg->header.stamp = this->now(); + joint_msg->header.frame_id = "panda_link3"; + joint_pub_->publish(std::move(joint_msg)); + } } private: