style: (arm) cleanup old topics

This commit is contained in:
David
2025-11-21 17:38:52 -06:00
parent 65aab7f179
commit ad0266654b

View File

@@ -1,5 +1,6 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
import serial import serial
import sys import sys
import threading import threading
@@ -8,10 +9,8 @@ import time
import atexit import atexit
import signal import signal
from std_msgs.msg import String from std_msgs.msg import String
from astra_msgs.msg import ArmManual
from astra_msgs.msg import SocketFeedback
from astra_msgs.msg import DigitFeedback
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
import math import math
# control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
@@ -31,31 +30,37 @@ thread = None
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node
super().__init__("arm_node") super().__init__("arm_node")
# Get launch mode parameter # Get launch mode -- anchor vs arm
self.declare_parameter("launch_mode", "arm") self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}") self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
# Create publishers ##################################################
self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) # Topics
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
)
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Anchor topics
self.man_sub = self.create_subscription( if self.launch_mode == "anchor":
ArmManual, "/arm/control/manual", self.send_manual, 10 self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# Manual: who tf knows. Maybe JointJog?
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
) )
# New messages # Feedback
self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
self.joint_state = JointState() # IK: /joint_states is published from here to topic_based_control
self.joint_state.name = [ self.joint_state_pub_ = self.create_publisher(JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data)
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"Axis_0_Joint", "Axis_0_Joint",
"Axis_1_Joint", "Axis_1_Joint",
"Axis_2_Joint", "Axis_2_Joint",
@@ -64,23 +69,29 @@ class SerialRelay(Node):
"Wrist-EF_Roll_Joint", "Wrist-EF_Roll_Joint",
"Gripper_Slider_Left", "Gripper_Slider_Left",
] ]
self.joint_state.position = [0.0] * len( self.saved_joint_state.position = [0.0] * len(
self.joint_state.name self.saved_joint_state.name
) # Initialize with zeros ) # Initialize with zeros
self.joint_command_sub = self.create_subscription( # Old
JointState, "/joint_commands", self.joint_command_callback, 10
)
# Topics used in anchor mode # Create publishers
if self.launch_mode == "anchor": self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.anchor_sub = self.create_subscription( self.socket_pub = self.create_publisher(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 SocketFeedback, "/arm/feedback/socket", 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
)
##################################################
# Find microcontroller -- non-anchor only
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == "arm": if self.launch_mode == "arm":
@@ -235,10 +246,10 @@ class SerialRelay(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians( self.saved_joint_state.position[4] = math.radians(
float(parts[4]) float(parts[4])
) # Wrist roll ) # Wrist roll
self.joint_state.position[5] = math.radians( self.saved_joint_state.position[5] = math.radians(
float(parts[3]) float(parts[3])
) # Wrist yaw ) # Wrist yaw
else: else:
@@ -265,13 +276,13 @@ class SerialRelay(Node):
self.arm_feedback.axis3_angle = angles[3] self.arm_feedback.axis3_angle = angles[3]
# Joint state publisher for URDF visualization # Joint state publisher for URDF visualization
self.joint_state.position[0] = math.radians(angles[0]) # Axis 0 self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.joint_state.position[1] = math.radians(angles[1]) # Axis 1 self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted) self.saved_joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted)
self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted) self.saved_joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted)
# Wrist is handled by digit feedback # Wrist is handled by digit feedback
self.joint_state.header.stamp = self.get_clock().now().to_msg() self.saved_joint_state.header.stamp = self.get_clock().now().to_msg()
self.joint_state_pub.publish(self.joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")