feat: (arm) add JointJog for manual arm input

This commit is contained in:
David Sharpe
2026-03-07 23:55:33 -06:00
parent 169ab85607
commit 0929cc9503

View File

@@ -1,18 +1,17 @@
import sys
import threading
import signal
import math
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos from rclpy import qos
import serial
import sys
import threading
import glob
import time
import atexit
import signal
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math
# control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -27,23 +26,38 @@ import math
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list class ArmNode(Node):
viccan_socket_msg_len_dict = { """Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4, 53: 4,
54: 4, 54: 4,
55: 4, 55: 4,
58: 4, 58: 4,
59: 4, 59: 4,
} }
viccan_digit_msg_len_dict = {
viccan_digit_msg_len_dict = {
54: 4, 54: 4,
55: 2, 55: 2,
59: 2, 59: 2,
} }
class ArmNode(Node):
def __init__(self): def __init__(self):
super().__init__("arm_node") super().__init__("arm_node")
@@ -67,7 +81,10 @@ class ArmNode(Node):
# Control # Control
# Manual: who tf knows. Maybe JointJog? # Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, 1
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control # IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription( self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1 JointState, "/joint_commands", self.joint_command_callback, 1
@@ -86,15 +103,7 @@ class ArmNode(Node):
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
) )
self.saved_joint_state = JointState() self.saved_joint_state = JointState()
self.saved_joint_state.name = [ self.saved_joint_state.name = self.all_joint_names
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
self.saved_joint_state.position = [0.0] * len( self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name self.saved_joint_state.name
) # Initialize with zeros ) # Initialize with zeros
@@ -128,6 +137,55 @@ class ArmNode(Node):
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def jointjog_callback(self, msg: JointJog):
if (
len(msg.joint_names) == 0
or len(msg.velocities) == 0
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)]
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
)
)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: if len(msg.position) < 7 and len(msg.velocity) < 7:
return # command needs either position or velocity for all 7 joints return # command needs either position or velocity for all 7 joints
@@ -217,8 +275,8 @@ class ArmNode(Node):
return return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict: if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id] expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -308,8 +366,8 @@ class ArmNode(Node):
return return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict: if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id] expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"