refactor: (arm) reorganize __init__() and remove run()

This commit is contained in:
David Sharpe
2026-03-08 00:10:33 -06:00
parent 0929cc9503
commit 667247cac8

View File

@@ -5,6 +5,7 @@ import math
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos from rclpy import qos
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
@@ -64,55 +65,14 @@ class ArmNode(Node):
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
################################################## ##################################################
# Topics # Old topics
# Anchor topics # Anchor topics
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 String, "/anchor/arm/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, 1
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
self.saved_joint_state = JointState()
self.saved_joint_state.name = self.all_joint_names
self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
# Old
# Create publishers # Create publishers
self.socket_pub = self.create_publisher( self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10 SocketFeedback, "/arm/feedback/socket", 10
@@ -127,15 +87,56 @@ class ArmNode(Node):
ArmManual, "/arm/control/manual", self.send_manual, 10 ArmManual, "/arm/control/manual", self.send_manual, 10
) )
def run(self): ###################################################
global thread # New topics
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
try: # Anchor topics
thread.join()
except KeyboardInterrupt: # from_vic
pass self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Control
# Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, 1
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
# Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
# IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog): def jointjog_callback(self, msg: JointJog):
if ( if (
@@ -451,8 +452,15 @@ def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
arm_node = ArmNode() arm_node = ArmNode()
arm_node.run() thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
rclpy.try_shutdown() thread.start()
try:
thread.join()
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":