refactor: (headless) reorganize __init__(), add use_old_topics parameter

This commit is contained in:
David Sharpe
2026-03-08 00:42:41 -06:00
parent f23d8c62ff
commit 6fa47021fc

View File

@@ -54,7 +54,6 @@ arm_ik_qos = qos.QoSProfile(
durability=qos.QoSDurabilityPolicy.VOLATILE, durability=qos.QoSDurabilityPolicy.VOLATILE,
) )
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05")) STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
@@ -66,6 +65,9 @@ class Headless(Node):
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless") super().__init__("headless")
##################################################
# Preamble
# Wait for anchor to start # Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug") pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
while len(pub_info) == 0: while len(pub_info) == 0:
@@ -116,30 +118,23 @@ class Headless(Node):
break break
id += 1 id += 1
self.create_timer(0.1, self.send_controls) ##################################################
# Parameters
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.declare_parameter("use_old_topics", True)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) self.use_old_topics = (
self.arm_ik_twist_publisher = self.create_publisher( self.get_parameter("use_old_topics").get_parameter_value().bool_value
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.declare_parameter("arm_mode", "manual") self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value self.arm_mode = (
self.get_parameter("arm_mode").get_parameter_value().string_value
)
self.declare_parameter("arm_manual_scheme", "old") self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value self.arm_manual_scheme = (
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
)
# Check parameter validity # Check parameter validity
if self.arm_mode not in ["manual", "ik"]: if self.arm_mode not in ["manual", "ik"]:
@@ -151,6 +146,45 @@ class Headless(Node):
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)." f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
) )
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands # If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None self.servo_start_client = None
if self.arm_mode == "ik": if self.arm_mode == "ik":
@@ -170,10 +204,6 @@ class Headless(Node):
if self.servo_start_client.service_is_ready(): if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request()) self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
@@ -214,7 +244,7 @@ class Headless(Node):
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
# CORE # CORE
if self.ctrl_mode == "core" and CORE_MODE == "duty": if self.ctrl_mode == "core" and self.use_old_topics:
input = CoreControl() input = CoreControl()
input.max_speed = 90 input.max_speed = 90
@@ -238,7 +268,7 @@ class Headless(Node):
self.core_publisher.publish(input) self.core_publisher.publish(input)
elif self.ctrl_mode == "core" and CORE_MODE == "twist": elif self.ctrl_mode == "core" and not self.use_old_topics:
input = Twist() input = Twist()
# Collect controller state # Collect controller state
@@ -409,7 +439,6 @@ class Headless(Node):
else: else:
arm_input.linear_actuator = 0 arm_input.linear_actuator = 0
# BIO # BIO
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
@@ -473,9 +502,7 @@ class Headless(Node):
# Triggers: EF Grippers # Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0: if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append( arm_jointjog.joint_names.append("ef_gripper_left_joint")
"ef_gripper_left_joint"
)
arm_jointjog.velocities.append(float(right_trigger - left_trigger)) arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x # Bumpers: angular x