feat(arm): implement ArmCtrlState topic

This commit is contained in:
David
2026-03-26 12:40:27 -05:00
parent f1c84c3cc5
commit c42cd39fda
3 changed files with 56 additions and 10 deletions

View File

@@ -12,7 +12,7 @@ from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -78,7 +78,9 @@ class ArmNode(Node):
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.anchor_pub = self.create_publisher(
String, "/anchor/to_vic/relay_string", 10
)
# Create publishers
self.socket_pub = self.create_publisher(
@@ -112,10 +114,10 @@ class ArmNode(Node):
# Control
# Manual: /arm/manual/joint_jog is published by Basestation or Headless
# Manual: /arm/control/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/manual/joint_jog",
"/arm/control/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
@@ -126,6 +128,13 @@ class ArmNode(Node):
self.joint_command_callback,
qos_profile=control_qos,
)
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
)
# Feedback
@@ -204,6 +213,25 @@ class ArmNode(Node):
# TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState
from astra_msgs.msg import CoreCtrlState, ArmCtrlState
import warnings
@@ -177,6 +177,8 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
##################################################
# Old Topics
@@ -203,7 +205,10 @@ class Headless(Node):
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
@@ -582,13 +587,26 @@ class Headless(Node):
)
# A: brake
# TODO: Brake mode
new_brake_mode = button_a
# Y: linear actuator
# TODO: linear actuator
# X: laser
new_laser = button_x
self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW)
# =============