feat: (headless) add arm IK control support

Only Twist-based so far, no JointJog. need to figure out which frame to send commands in. Currently `base_link`.
This commit is contained in:
David
2026-01-19 11:54:00 -06:00
parent 213105a46b
commit caa5a637bb
3 changed files with 152 additions and 56 deletions

View File

@@ -16,8 +16,10 @@ import pwd
import grp import grp
from math import copysign from math import copysign
from std_srvs.srv import Trigger
from std_msgs.msg import String from std_msgs.msg import String
from geometry_msgs.msg import Twist from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState
@@ -45,6 +47,13 @@ control_qos = qos.QoSProfile(
liveliness_lease_duration=Duration(seconds=5), liveliness_lease_duration=Duration(seconds=5),
) )
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
)
CORE_MODE = "twist" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05")) STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
@@ -107,10 +116,16 @@ class Headless(Node):
break break
id += 1 id += 1
self.create_timer(0.15, self.send_controls) self.create_timer(0.1, self.send_controls)
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher( self.core_twist_pub_ = self.create_publisher(
@@ -120,12 +135,40 @@ class Headless(Node):
CoreCtrlState, "/core/control/state", qos_profile=control_qos CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.declare_parameter('arm_manual_scheme', "old") self.declare_parameter("arm_mode", "manual")
self.arm_manual_scheme = self.get_parameter('arm_manual_scheme').value self.arm_mode = self.get_parameter("arm_mode").value
self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
# Check parameter validity # Check parameter validity
if self.arm_mode not in ["manual", "ik"]:
self.get_logger().warn(
f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)."
)
if self.arm_manual_scheme not in ["old", "new"]: if self.arm_manual_scheme not in ["old", "new"]:
self.get_logger().warn(f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls).") self.get_logger().warn(
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
)
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.arm_mode == "ik":
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
)
timeout_counter = 0
while not self.servo_start_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Waiting for servo_node/start_servo service...")
timeout_counter += 1
if timeout_counter >= 10:
self.get_logger().error(
"Servo's start service not available. IK control will not work."
)
break
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
@@ -134,18 +177,6 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
for event in pygame.event.get(): for event in pygame.event.get():
@@ -255,7 +286,7 @@ class Headless(Node):
) )
# ARM and BIO # ARM and BIO
if self.ctrl_mode == "arm": if self.ctrl_mode == "arm" and self.arm_mode == "manual":
arm_input = ArmManual() arm_input = ArmManual()
# Collect controller state # Collect controller state
@@ -328,24 +359,24 @@ class Headless(Node):
# Y: linear actuator # Y: linear actuator
# Right stick: EF yaw and axis 3 # Right stick: EF yaw and axis 3
arm_input.effector_yaw = 0 if right_stick_x == 0 else int( arm_input.effector_yaw = (
copysign(1, right_stick_x) 0 if right_stick_x == 0 else int(copysign(1, right_stick_x))
) )
arm_input.axis3 = 0 if right_stick_y == 0 else int( arm_input.axis3 = (
copysign(-1, right_stick_y) 0 if right_stick_y == 0 else int(copysign(-1, right_stick_y))
) )
# Left stick: axis 1 and 2 # Left stick: axis 1 and 2
arm_input.axis1 = 0 if left_stick_x == 0 else int( arm_input.axis1 = (
copysign(1, left_stick_x) 0 if left_stick_x == 0 else int(copysign(1, left_stick_x))
) )
arm_input.axis2 = 0 if left_stick_y == 0 else int( arm_input.axis2 = (
copysign(-1, left_stick_y) 0 if left_stick_y == 0 else int(copysign(-1, left_stick_y))
) )
# D-pad: axis 0 and _ # D-pad: axis 0 and _
arm_input.axis0 = 0 if dpad_input[0] == 0 else int( arm_input.axis0 = (
copysign(1, dpad_input[0]) 0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0]))
) )
# Triggers: EF Grippers # Triggers: EF Grippers
@@ -386,6 +417,71 @@ class Headless(Node):
self.arm_publisher.publish(arm_input) self.arm_publisher.publish(arm_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg()
arm_jointjog = JointJog()
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x
# Left stick: angular z and linear z
# D-pad: angular y and _
# Triggers: EF grippers
# Bumpers: angular x
# A: brake
# B: IK mode
# X: manual mode
# Y: linear actuator
# Right stick: linear y and linear x
arm_twist.twist.linear.y = float(right_stick_x)
arm_twist.twist.linear.x = float(right_stick_y)
# Left stick: angular z and linear z
arm_twist.twist.angular.z = float(-1 * left_stick_x)
arm_twist.twist.linear.z = float(-1 * left_stick_y)
# D-pad: angular y and _
arm_twist.twist.angular.y = (
float(0)
if dpad_input[0] == 0
else float(-1 * copysign(0.75, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append(
"Gripper_Slider_Left" # TODO: Update joint name
)
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
if left_bumper > 0 and right_bumper > 0:
arm_twist.twist.angular.x = float(0)
elif left_bumper > 0:
arm_twist.twist.angular.x = float(1)
elif right_bumper > 0:
arm_twist.twist.angular.x = float(-1)
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""

View File

@@ -107,23 +107,23 @@ bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& but
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist, std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist,
std::unique_ptr<control_msgs::msg::JointJog>& joint) std::unique_ptr<control_msgs::msg::JointJog>& joint)
{ {
// // Give joint jogging priority because it is only buttons // Give joint jogging priority because it is only buttons
// // If any joint jog command is requested, we are only publishing joint commands // If any joint jog command is requested, we are only publishing joint commands
// if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y]) if (0)
// { {
// // Map the D_PAD to the proximal joints // Map the D_PAD to the proximal joints
// joint->joint_names.push_back("panda_joint1"); joint->joint_names.push_back("panda_joint1");
// joint->velocities.push_back(axes[D_PAD_X]); joint->velocities.push_back(axes[D_PAD_X]);
// joint->joint_names.push_back("panda_joint2"); joint->joint_names.push_back("panda_joint2");
// joint->velocities.push_back(axes[D_PAD_Y]); joint->velocities.push_back(axes[D_PAD_Y]);
// // Map the diamond to the distal joints // Map the diamond to the distal joints
// joint->joint_names.push_back("panda_joint7"); joint->joint_names.push_back("panda_joint7");
// joint->velocities.push_back(buttons[B] - buttons[X]); joint->velocities.push_back(buttons[B] - buttons[X]);
// joint->joint_names.push_back("panda_joint6"); joint->joint_names.push_back("panda_joint6");
// joint->velocities.push_back(buttons[Y] - buttons[A]); joint->velocities.push_back(buttons[Y] - buttons[A]);
// return false; return false;
// } }
// The bread and butter: map buttons to twist commands // The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y]; twist->twist.linear.z = axes[RIGHT_STICK_Y];
@@ -243,13 +243,13 @@ public:
twist_msg->header.stamp = this->now(); twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg)); twist_pub_->publish(std::move(twist_msg));
} }
// else else
// { {
// // publish the JointJog // publish the JointJog
// joint_msg->header.stamp = this->now(); joint_msg->header.stamp = this->now();
// joint_msg->header.frame_id = "panda_link3"; joint_msg->header.frame_id = "panda_link3";
// joint_pub_->publish(std::move(joint_msg)); joint_pub_->publish(std::move(joint_msg));
// } }
} }
private: private: