From ad0266654bdd790a64c02789afd56b5289f6d269 Mon Sep 17 00:00:00 2001 From: David Date: Fri, 21 Nov 2025 17:38:52 -0600 Subject: [PATCH] style: (arm) cleanup old topics --- src/arm_pkg/arm_pkg/arm_node.py | 91 ++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index ab621a1..01d283a 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -1,5 +1,6 @@ import rclpy from rclpy.node import Node +from rclpy import qos import serial import sys import threading @@ -8,10 +9,8 @@ import time import atexit import signal from std_msgs.msg import String -from astra_msgs.msg import ArmManual -from astra_msgs.msg import SocketFeedback -from astra_msgs.msg import DigitFeedback from sensor_msgs.msg import JointState +from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual import math # control_qos = qos.QoSProfile( @@ -31,31 +30,37 @@ thread = None class SerialRelay(Node): def __init__(self): - # Initialize node super().__init__("arm_node") - # Get launch mode parameter + # Get launch mode -- anchor vs arm self.declare_parameter("launch_mode", "arm") self.launch_mode = self.get_parameter("launch_mode").value self.get_logger().info(f"arm launch_mode is: {self.launch_mode}") - # Create publishers - self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) - self.socket_pub = self.create_publisher( - SocketFeedback, "/arm/feedback/socket", 10 - ) - self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) - self.feedback_timer = self.create_timer(0.25, self.publish_feedback) + ################################################## + # Topics - # Create subscribers - self.man_sub = self.create_subscription( - ArmManual, "/arm/control/manual", self.send_manual, 10 + # Anchor topics + if self.launch_mode == "anchor": + self.anchor_sub = self.create_subscription( + String, "/anchor/arm/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) + + # Control + + # Manual: who tf knows. Maybe JointJog? + # IK: /joint_commands is published by JointTrajectoryController via topic_based_control + self.joint_command_sub_ = self.create_subscription( + JointState, "/joint_commands", self.joint_command_callback, 1 ) - # New messages - self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10) - self.joint_state = JointState() - self.joint_state.name = [ + # Feedback + + # IK: /joint_states is published from here to topic_based_control + self.joint_state_pub_ = self.create_publisher(JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data) + self.saved_joint_state = JointState() + self.saved_joint_state.name = [ "Axis_0_Joint", "Axis_1_Joint", "Axis_2_Joint", @@ -64,23 +69,29 @@ class SerialRelay(Node): "Wrist-EF_Roll_Joint", "Gripper_Slider_Left", ] - self.joint_state.position = [0.0] * len( - self.joint_state.name + self.saved_joint_state.position = [0.0] * len( + self.saved_joint_state.name ) # Initialize with zeros - self.joint_command_sub = self.create_subscription( - JointState, "/joint_commands", self.joint_command_callback, 10 + # Old + + # Create publishers + self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) + self.socket_pub = self.create_publisher( + SocketFeedback, "/arm/feedback/socket", 10 + ) + self.arm_feedback = SocketFeedback() + self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) + self.digit_feedback = DigitFeedback() + self.feedback_timer = self.create_timer(0.25, self.publish_feedback) + + # Create subscribers + self.man_sub = self.create_subscription( + ArmManual, "/arm/control/manual", self.send_manual, 10 ) - # Topics used in anchor mode - if self.launch_mode == "anchor": - self.anchor_sub = self.create_subscription( - String, "/anchor/arm/feedback", self.anchor_feedback, 10 - ) - self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) - - self.arm_feedback = SocketFeedback() - self.digit_feedback = DigitFeedback() + ################################################## + # Find microcontroller -- non-anchor only # Search for ports IF in 'arm' (standalone) and not 'anchor' mode if self.launch_mode == "arm": @@ -235,10 +246,10 @@ class SerialRelay(Node): if len(parts) >= 4: self.digit_feedback.wrist_angle = float(parts[3]) # self.digit_feedback.wrist_roll = float(parts[4]) - self.joint_state.position[4] = math.radians( + self.saved_joint_state.position[4] = math.radians( float(parts[4]) ) # Wrist roll - self.joint_state.position[5] = math.radians( + self.saved_joint_state.position[5] = math.radians( float(parts[3]) ) # Wrist yaw else: @@ -265,13 +276,13 @@ class SerialRelay(Node): self.arm_feedback.axis3_angle = angles[3] # Joint state publisher for URDF visualization - self.joint_state.position[0] = math.radians(angles[0]) # Axis 0 - self.joint_state.position[1] = math.radians(angles[1]) # Axis 1 - self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted) - self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted) + self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0 + self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1 + self.saved_joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted) + self.saved_joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted) # Wrist is handled by digit feedback - self.joint_state.header.stamp = self.get_clock().now().to_msg() - self.joint_state_pub.publish(self.joint_state) + self.saved_joint_state.header.stamp = self.get_clock().now().to_msg() + self.joint_state_pub_.publish(self.saved_joint_state) else: self.get_logger().info("Invalid angle feedback input format")