mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 03:41:17 -05:00
refactor: implement Riley's comments
- Add @warnings.deprecated decorators to callbacks that use old topics - Rename "*new*" topics - Print debug on malformed message receival - Un-invert Axes 2 & 3 in the URDF - Don't silently check mcu_name twice - Remove threading.Thread (not one of Riley's comments) - Add a real signal handler (ditto) - Move stamped stop messages to helper functions
This commit is contained in:
@@ -2,6 +2,7 @@ import sys
|
||||
import threading
|
||||
import signal
|
||||
import math
|
||||
from warnings import deprecated
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
@@ -11,7 +12,7 @@ from rclpy import qos
|
||||
from std_msgs.msg import String, Header
|
||||
from sensor_msgs.msg import JointState
|
||||
from control_msgs.msg import JointJog
|
||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
|
||||
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
@@ -111,8 +112,8 @@ class ArmNode(Node):
|
||||
# Control
|
||||
|
||||
# Manual: /arm/manual_new is published by Servo or Basestation
|
||||
self.jointjog_pub_ = self.create_subscription(
|
||||
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
|
||||
self.man_jointjog_pub_ = self.create_subscription(
|
||||
JointJog, "/arm/manual/joint_jog", self.jointjog_callback, qos_profile=control_qos
|
||||
)
|
||||
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
||||
self.joint_command_sub_ = self.create_subscription(
|
||||
@@ -124,12 +125,12 @@ class ArmNode(Node):
|
||||
# Combined Socket and Digit feedback
|
||||
self.arm_feedback_pub_ = self.create_publisher(
|
||||
ArmFeedback,
|
||||
"/arm/feedback/new_feedback",
|
||||
"/arm/feedback",
|
||||
qos_profile=qos.qos_profile_sensor_data,
|
||||
)
|
||||
# IK arm pose: /joint_states is published from here to topic_based_control
|
||||
self.joint_state_pub_ = self.create_publisher(
|
||||
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
|
||||
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
|
||||
)
|
||||
|
||||
###################################################
|
||||
@@ -146,12 +147,9 @@ class ArmNode(Node):
|
||||
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
|
||||
|
||||
def jointjog_callback(self, msg: JointJog):
|
||||
if (
|
||||
len(msg.joint_names) == 0
|
||||
or len(msg.velocities) == 0
|
||||
or len(msg.joint_names) != len(msg.velocities)
|
||||
):
|
||||
return # Malformed message
|
||||
if len(msg.joint_names) != len(msg.velocities):
|
||||
self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
|
||||
return
|
||||
|
||||
# Grab velocities from message
|
||||
velocities = [
|
||||
@@ -168,9 +166,6 @@ class ArmNode(Node):
|
||||
velocities = [
|
||||
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
|
||||
]
|
||||
# Axis 2 & 3 URDF direction is inverted
|
||||
velocities[2] = -velocities[2]
|
||||
velocities[3] = -velocities[3]
|
||||
|
||||
# Send Axis 0-3
|
||||
self.anchor_tovic_pub_.publish(
|
||||
@@ -196,6 +191,7 @@ class ArmNode(Node):
|
||||
|
||||
def joint_command_callback(self, msg: JointState):
|
||||
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
||||
self.get_logger().debug("Ignoring malformed /joint_command message.")
|
||||
return # command needs either position or velocity for all 7 joints
|
||||
|
||||
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
|
||||
@@ -204,9 +200,6 @@ class ArmNode(Node):
|
||||
velocities = [
|
||||
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
|
||||
]
|
||||
# Axis 2 & 3 URDF direction is inverted
|
||||
velocities[2] = -velocities[2]
|
||||
velocities[3] = -velocities[3]
|
||||
|
||||
# Axis 0-3
|
||||
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3])
|
||||
@@ -218,8 +211,8 @@ class ArmNode(Node):
|
||||
self.anchor_tovic_pub_.publish(arm_cmd)
|
||||
self.anchor_tovic_pub_.publish(digit_cmd)
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def send_manual(self, msg: ArmManual):
|
||||
"""TODO: Old"""
|
||||
axis0 = msg.axis0
|
||||
axis1 = -1 * msg.axis1
|
||||
axis2 = msg.axis2
|
||||
@@ -243,13 +236,13 @@ class ArmNode(Node):
|
||||
|
||||
return
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def send_cmd(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
output = String(data=msg)
|
||||
self.anchor_pub.publish(output)
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def anchor_feedback(self, msg: String):
|
||||
"""TODO: Old"""
|
||||
output = msg.data
|
||||
if output.startswith("can_relay_fromvic,arm,55"):
|
||||
self.updateAngleFeedback(output)
|
||||
@@ -282,8 +275,7 @@ class ArmNode(Node):
|
||||
self.process_fromvic_digit(msg)
|
||||
|
||||
def process_fromvic_arm(self, msg: VicCAN):
|
||||
if msg.mcu_name != "arm":
|
||||
return
|
||||
assert msg.mcu_name == "arm"
|
||||
|
||||
# Check message len to prevent crashing on bad data
|
||||
if msg.command_id in self.viccan_socket_msg_len_dict:
|
||||
@@ -326,12 +318,8 @@ class ArmNode(Node):
|
||||
# Joint state publisher for URDF visualization
|
||||
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
|
||||
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
|
||||
self.saved_joint_state.position[2] = math.radians(
|
||||
-angles[2]
|
||||
) # Axis 2 (inverted)
|
||||
self.saved_joint_state.position[3] = math.radians(
|
||||
-angles[3]
|
||||
) # Axis 3 (inverted)
|
||||
self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
|
||||
self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
|
||||
# Wrist is handled by digit feedback
|
||||
self.saved_joint_state.header.stamp = msg.header.stamp
|
||||
self.joint_state_pub_.publish(self.saved_joint_state)
|
||||
@@ -363,18 +351,17 @@ class ArmNode(Node):
|
||||
velocities[1]
|
||||
) # Axis 1
|
||||
self.saved_joint_state.velocity[2] = math.radians(
|
||||
-velocities[2]
|
||||
) # Axis 2 (-)
|
||||
velocities[2]
|
||||
) # Axis 2
|
||||
self.saved_joint_state.velocity[3] = math.radians(
|
||||
-velocities[3]
|
||||
) # Axis 3 (-)
|
||||
velocities[3]
|
||||
) # Axis 3
|
||||
# Wrist is handled by digit feedback
|
||||
self.saved_joint_state.header.stamp = msg.header.stamp
|
||||
self.joint_state_pub_.publish(self.saved_joint_state)
|
||||
|
||||
def process_fromvic_digit(self, msg: VicCAN):
|
||||
if msg.mcu_name != "digit":
|
||||
return
|
||||
assert msg.mcu_name == "digit"
|
||||
|
||||
# Check message len to prevent crashing on bad data
|
||||
if msg.command_id in self.viccan_digit_msg_len_dict:
|
||||
@@ -398,13 +385,13 @@ class ArmNode(Node):
|
||||
msg.data[1]
|
||||
) # Wrist yaw
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def publish_feedback(self):
|
||||
"""TODO: Old"""
|
||||
self.socket_pub.publish(self.arm_feedback)
|
||||
self.digit_pub.publish(self.digit_feedback)
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def updateAngleFeedback(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
# Angle feedbacks,
|
||||
# split the msg.data by commas
|
||||
parts = msg.split(",")
|
||||
@@ -422,8 +409,8 @@ class ArmNode(Node):
|
||||
else:
|
||||
self.get_logger().info("Invalid angle feedback input format")
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def updateBusVoltage(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
# Bus Voltage feedbacks
|
||||
parts = msg.split(",")
|
||||
if len(parts) >= 7:
|
||||
@@ -437,8 +424,8 @@ class ArmNode(Node):
|
||||
else:
|
||||
self.get_logger().info("Invalid voltage feedback input format")
|
||||
|
||||
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||
def updateMotorFeedback(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
parts = str(msg.strip()).split(",")
|
||||
motorId = round(float(parts[3]))
|
||||
temp = float(parts[4]) / 10.0
|
||||
@@ -462,15 +449,22 @@ class ArmNode(Node):
|
||||
self.arm_feedback.axis0_current = current
|
||||
|
||||
|
||||
def exit_handler(signum, frame):
|
||||
print("Caught SIGTERM. Exiting...")
|
||||
rclpy.try_shutdown()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
||||
# Catch termination signals and exit cleanly
|
||||
signal.signal(signal.SIGTERM, exit_handler)
|
||||
|
||||
arm_node = ArmNode()
|
||||
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
thread.join()
|
||||
rclpy.spin(arm_node)
|
||||
except (KeyboardInterrupt, ExternalShutdownException):
|
||||
pass
|
||||
finally:
|
||||
@@ -478,8 +472,4 @@ def main(args=None):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
signal.signal(
|
||||
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
||||
) # Catch termination signals and exit cleanly
|
||||
main()
|
||||
|
||||
Submodule src/astra_descriptions updated: 72d3c223af...c36bd8233d
@@ -39,9 +39,9 @@ os.environ["SDL_AUDIODRIVER"] = (
|
||||
CORE_STOP_MSG = CoreControl() # All zeros by default
|
||||
CORE_STOP_TWIST_MSG = Twist() # "
|
||||
ARM_STOP_MSG = ArmManual() # "
|
||||
ARM_IK_STOP_MSG = TwistStamped() # "
|
||||
BIO_STOP_MSG = BioControl() # "
|
||||
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
@@ -75,15 +75,7 @@ class Headless(Node):
|
||||
# Initialize pygame first
|
||||
pygame.init()
|
||||
pygame.joystick.init()
|
||||
super().__init__("headless")
|
||||
|
||||
# TODO: move the STOP_MSGs somewhere better
|
||||
global ARM_STOP_JOG_MSG
|
||||
ARM_STOP_JOG_MSG = JointJog(
|
||||
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
|
||||
joint_names=self.all_joint_names,
|
||||
velocities=[0.0] * len(self.all_joint_names),
|
||||
)
|
||||
super().__init__("headless_node")
|
||||
|
||||
##################################################
|
||||
# Preamble
|
||||
@@ -247,9 +239,9 @@ class Headless(Node):
|
||||
else:
|
||||
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
||||
if self.use_arm_ik:
|
||||
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG)
|
||||
self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
|
||||
else:
|
||||
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG)
|
||||
self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
|
||||
# TODO: add bio here after implementing new topics
|
||||
|
||||
def send_controls(self):
|
||||
@@ -519,14 +511,12 @@ class Headless(Node):
|
||||
# X: _
|
||||
# Y: linear actuator out
|
||||
|
||||
ARM_THRESHOLD = 0.2
|
||||
|
||||
# Right stick: EF yaw and axis 3
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
|
||||
stick_to_arm_direction(right_stick_x)
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
|
||||
-1 * stick_to_arm_direction(right_stick_y)
|
||||
stick_to_arm_direction(right_stick_y)
|
||||
)
|
||||
|
||||
# Left stick: axis 1 and 2
|
||||
@@ -534,7 +524,7 @@ class Headless(Node):
|
||||
stick_to_arm_direction(left_stick_x)
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
|
||||
-1 * stick_to_arm_direction(left_stick_y)
|
||||
stick_to_arm_direction(left_stick_y)
|
||||
)
|
||||
|
||||
# D-pad: axis 0 and _
|
||||
@@ -653,6 +643,18 @@ class Headless(Node):
|
||||
else:
|
||||
pass # TODO: implement new bio control topics
|
||||
|
||||
def arm_manual_stop_msg(self):
|
||||
return JointJog(
|
||||
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
|
||||
joint_names=self.all_joint_names,
|
||||
velocities=[0.0] * len(self.all_joint_names),
|
||||
)
|
||||
|
||||
def arm_ik_twist_stop_msg(self):
|
||||
return TwistStamped(
|
||||
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
|
||||
)
|
||||
|
||||
|
||||
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
||||
|
||||
Reference in New Issue
Block a user