3 Commits

Author SHA1 Message Date
David
c42cd39fda feat(arm): implement ArmCtrlState topic 2026-03-26 12:40:27 -05:00
David
f1c84c3cc5 fix(arm): correctly control arm 2026-03-26 03:53:38 -05:00
David
cf699da0c6 fix(arm): populate missing feedback fields 2026-03-26 02:40:58 -05:00
3 changed files with 94 additions and 13 deletions

View File

@@ -12,7 +12,7 @@ from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -78,7 +78,9 @@ class ArmNode(Node):
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 String, "/anchor/arm/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(
String, "/anchor/to_vic/relay_string", 10
)
# Create publishers # Create publishers
self.socket_pub = self.create_publisher( self.socket_pub = self.create_publisher(
@@ -112,10 +114,10 @@ class ArmNode(Node):
# Control # Control
# Manual: /arm/manual/joint_jog is published by Basestation or Headless # Manual: /arm/control/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription( self.man_jointjog_sub_ = self.create_subscription(
JointJog, JointJog,
"/arm/manual/joint_jog", "/arm/control/joint_jog",
self.jointjog_callback, self.jointjog_callback,
qos_profile=control_qos, qos_profile=control_qos,
) )
@@ -126,6 +128,13 @@ class ArmNode(Node):
self.joint_command_callback, self.joint_command_callback,
qos_profile=control_qos, qos_profile=control_qos,
) )
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
)
# Feedback # Feedback
@@ -145,9 +154,14 @@ class ArmNode(Node):
# Combined Socket and Digit feedback # Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback() self.arm_feedback_new = ArmFeedback()
self.arm_feedback_new.axis0_motor.id = 1
self.arm_feedback_new.axis1_motor.id = 2
self.arm_feedback_new.axis2_motor.id = 3
self.arm_feedback_new.axis3_motor.id = 4
# IK Arm pose # IK Arm pose
self.saved_joint_state = JointState() self.saved_joint_state = JointState()
self.saved_joint_state.header.frame_id = "base_link"
self.saved_joint_state.name = self.all_joint_names self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros # ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name) self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
@@ -170,10 +184,54 @@ class ArmNode(Node):
# Deadzone # Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities] velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.send_velocities(velocities, msg.header) self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=39,
data=velocities[0:4],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=39,
data=velocities[4:6],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=26,
data=[velocities[6]],
header=msg.header,
)
)
# TODO: use msg.duration # TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.") self.get_logger().debug("Ignoring malformed /joint_command message.")
@@ -199,12 +257,12 @@ class ArmNode(Node):
# Send Axis 0-3 # Send Axis 0-3
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header) VicCAN(mcu_name="arm", command_id=43, data=velocities[0:4], header=header)
) )
# Send Wrist yaw and roll # Send Wrist yaw and roll
# TODO: Verify embedded # TODO: Verify embedded
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header) VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6], header=header)
) )
# Send End Effector Gripper # Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded # TODO: Verify m/s received correctly by embedded
@@ -287,6 +345,8 @@ class ArmNode(Node):
) )
return return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id: match msg.command_id:
case 53: # REV SPARK MAX feedback case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0]) motorId = round(msg.data[0])
@@ -373,11 +433,14 @@ class ArmNode(Node):
) )
return return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id: match msg.command_id:
case 54: # Board voltages case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0 self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0 self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0 self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.digit_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians( self.saved_joint_state.position[4] = math.radians(
msg.data[0] msg.data[0]

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState, ArmCtrlState
import warnings import warnings
@@ -177,6 +177,8 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
################################################## ##################################################
# Old Topics # Old Topics
@@ -203,7 +205,10 @@ class Headless(Node):
) )
self.arm_manual_pub_ = self.create_publisher( self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
) )
self.arm_ik_twist_publisher = self.create_publisher( self.arm_ik_twist_publisher = self.create_publisher(
@@ -582,13 +587,26 @@ class Headless(Node):
) )
# A: brake # A: brake
# TODO: Brake mode new_brake_mode = button_a
# Y: linear actuator # X: laser
# TODO: linear actuator new_laser = button_x
self.arm_manual_pub_.publish(arm_input) self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW) # IK (ONLY NEW)
# ============= # =============