3 Commits

Author SHA1 Message Date
David Sharpe
67b3c5bc8f refactor: (arm) consolidate velocity control VicCAN into single function 2026-03-17 01:44:32 -05:00
David Sharpe
c506a34b37 style: (arm) format 2026-03-17 01:43:22 -05:00
David Sharpe
980c08ba4f refactor: implement Riley's comments
- Add @warnings.deprecated decorators to callbacks that use old topics
- Rename "*new*" topics
- Print debug on malformed message receival
- Un-invert Axes 2 & 3 in the URDF
- Don't silently check mcu_name twice
- Remove threading.Thread (not one of Riley's comments)
- Add a real signal handler (ditto)
- Move stamped stop messages to helper functions
2026-03-17 01:14:18 -05:00
5 changed files with 104 additions and 392 deletions

View File

@@ -1,279 +0,0 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
from std_msgs.msg import String
from astra_msgs.msg import ControllerState
from astra_msgs.msg import ArmManual
from astra_msgs.msg import ArmIK
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("arm_headless")
# Depricated, kept temporarily for reference
# self.create_timer(0.20, self.send_controls)#read and send controls
self.create_timer(0.1, self.send_manual)
# Create a publisher to publish any output the pico sends
# Depricated, kept temporarily for reference
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
# Create a subscriber to listen to any commands sent for the pico
# Depricated, kept temporarily for reference
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
# Check the pico for updates
# self.read_feedback()
if (
pygame.joystick.get_count() == 0
): # if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
# self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
# self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() # re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_manual(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = ArmManual()
# Triggers for gripper control
if self.gamepad.get_axis(2) > 0: # left trigger
input.gripper = -1
elif self.gamepad.get_axis(5) > 0: # right trigger
input.gripper = 1
# Toggle Laser
if self.gamepad.get_button(7): # Start
self.laser_status = 1
elif self.gamepad.get_button(6): # Back
self.laser_status = 0
input.laser = self.laser_status
if self.gamepad.get_button(5): # right bumper, control effector
# Left stick X-axis for effector yaw
if self.gamepad.get_axis(0) > 0:
input.effector_yaw = 1
elif self.gamepad.get_axis(0) < 0:
input.effector_yaw = -1
# Right stick X-axis for effector roll
if self.gamepad.get_axis(3) > 0:
input.effector_roll = 1
elif self.gamepad.get_axis(3) < 0:
input.effector_roll = -1
else: # Control arm axis
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
input.axis1 = round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
input.axis3 = -1 * round(self.gamepad.get_axis(4))
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
# Button Mappings
# axis2 -> LT
# axis5 -> RT
# Buttons0 -> A
# Buttons1 -> B
# Buttons2 -> X
# Buttons3 -> Y
# Buttons4 -> LB
# Buttons5 -> RB
# Buttons6 -> Back
# Buttons7 -> Start
input.linear_actuator = 0
if pygame.joystick.get_count() != 0:
self.get_logger().info(
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
)
self.manual_pub.publish(input)
else:
pass
pass
# Depricated, kept temporarily for reference
# def send_controls(self):
# for event in pygame.event.get():
# if event.type == pygame.QUIT:
# pygame.quit()
# exit()
# input = ControllerState()
# input.lt = self.gamepad.get_axis(2)#left trigger
# input.rt = self.gamepad.get_axis(5)#right trigger
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
# if(self.gamepad.get_button(4)):#left bumper
# input.lb = True
# else:
# input.lb = False
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
# if(self.gamepad.get_button(5)):#right bumper
# input.rb = True
# else:
# input.rb = False
# #input.plus = self.gamepad.get_button(6)#plus button
# if(self.gamepad.get_button(7)):#plus button
# input.plus = True
# else:
# input.plus = False
# #input.minus = self.gamepad.get_button(4)#minus button
# if(self.gamepad.get_button(6)):#minus button
# input.minus = True
# else:
# input.minus = False
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
# #input.a = self.gamepad.get_button(1)#A button
# if(self.gamepad.get_button(0)):#A button
# input.a = True
# else:
# input.a = False
# #input.b = self.gamepad.get_button(0)#B button
# if(self.gamepad.get_button(1)):#B button
# input.b = True
# else:
# input.b = False
# #input.x = self.gamepad.get_button(3)#X button
# if(self.gamepad.get_button(2)):#X button
# input.x = True
# else:
# input.x = False
# #input.y = self.gamepad.get_button(2)#Y button
# if(self.gamepad.get_button(3)):#Y button
# input.y = True
# else:
# input.y = False
# dpad_input = self.gamepad.get_hat(0)#D-pad input
# #not using up/down on DPad
# input.d_up = False
# input.d_down = False
# if(dpad_input[0] == 1):#D-pad right
# input.d_right = True
# else:
# input.d_right = False
# if(dpad_input[0] == -1):#D-pad left
# input.d_left = True
# else:
# input.d_left = False
# if pygame.joystick.get_count() != 0:
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
# self.publisher.publish(input)
# else:
# pass
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
# tb_bs = BaseStation()
# node.run()
if __name__ == "__main__":
main()

View File

@@ -2,6 +2,7 @@ import sys
import threading import threading
import signal import signal
import math import math
from warnings import deprecated
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
@@ -11,7 +12,7 @@ from rclpy import qos
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
@@ -68,7 +69,9 @@ class ArmNode(Node):
# Parameters # Parameters
self.declare_parameter("use_old_topics", True) self.declare_parameter("use_old_topics", True)
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value self.use_old_topics = (
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
################################################## ##################################################
# Old topics # Old topics
@@ -85,7 +88,9 @@ class ArmNode(Node):
SocketFeedback, "/arm/feedback/socket", 10 SocketFeedback, "/arm/feedback/socket", 10
) )
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) self.digit_pub = self.create_publisher(
DigitFeedback, "/arm/feedback/digit", 10
)
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback) self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
@@ -111,12 +116,18 @@ class ArmNode(Node):
# Control # Control
# Manual: /arm/manual_new is published by Servo or Basestation # Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription( self.man_jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos JointJog,
"/arm/manual/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
) )
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control # IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription( self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos JointState,
"/joint_commands",
self.joint_command_callback,
qos_profile=control_qos,
) )
# Feedback # Feedback
@@ -124,12 +135,12 @@ class ArmNode(Node):
# Combined Socket and Digit feedback # Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher( self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback, ArmFeedback,
"/arm/feedback/new_feedback", "/arm/feedback",
qos_profile=qos.qos_profile_sensor_data, qos_profile=qos.qos_profile_sensor_data,
) )
# IK arm pose: /joint_states is published from here to topic_based_control # IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher( self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
) )
################################################### ###################################################
@@ -146,12 +157,9 @@ class ArmNode(Node):
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name) self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog): def jointjog_callback(self, msg: JointJog):
if ( if len(msg.joint_names) != len(msg.velocities):
len(msg.joint_names) == 0 self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
or len(msg.velocities) == 0 return
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message # Grab velocities from message
velocities = [ velocities = [
@@ -164,62 +172,51 @@ class ArmNode(Node):
] ]
# Deadzone # Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities] velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3 self.send_velocities(velocities, msg.header)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
)
)
# TODO: use msg.duration # TODO: use msg.duration
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
return # command needs either position or velocity for all 7 joints return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper # Grab velocities from message
# TODO: refactor to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians
velocities = [ velocities = [
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity (
msg.velocity[msg.name.index(joint_name)] # type: ignore
if joint_name in msg.name
else 0.0
)
for joint_name in self.all_joint_names
] ]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Axis 0-3 self.send_velocities(velocities, msg.header)
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3])
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg())
# Wrist yaw and roll, gripper included for future use when have adequate hardware
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6])
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg())
self.anchor_tovic_pub_.publish(arm_cmd) def send_velocities(self, velocities: list[float], header: Header):
self.anchor_tovic_pub_.publish(digit_cmd) # ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
"""TODO: Old"""
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
@@ -243,13 +240,13 @@ class ArmNode(Node):
return return
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
"""TODO: Old"""
output = String(data=msg) output = String(data=msg)
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
"""TODO: Old"""
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
@@ -282,8 +279,7 @@ class ArmNode(Node):
self.process_fromvic_digit(msg) self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN): def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm": assert msg.mcu_name == "arm"
return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_socket_msg_len_dict: if msg.command_id in self.viccan_socket_msg_len_dict:
@@ -326,12 +322,8 @@ class ArmNode(Node):
# Joint state publisher for URDF visualization # Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0 self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1 self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians( self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
-angles[2] self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
# Wrist is handled by digit feedback # Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
@@ -363,18 +355,17 @@ class ArmNode(Node):
velocities[1] velocities[1]
) # Axis 1 ) # Axis 1
self.saved_joint_state.velocity[2] = math.radians( self.saved_joint_state.velocity[2] = math.radians(
-velocities[2] velocities[2]
) # Axis 2 (-) ) # Axis 2
self.saved_joint_state.velocity[3] = math.radians( self.saved_joint_state.velocity[3] = math.radians(
-velocities[3] velocities[3]
) # Axis 3 (-) ) # Axis 3
# Wrist is handled by digit feedback # Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN): def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit": assert msg.mcu_name == "digit"
return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_digit_msg_len_dict: if msg.command_id in self.viccan_digit_msg_len_dict:
@@ -398,13 +389,13 @@ class ArmNode(Node):
msg.data[1] msg.data[1]
) # Wrist yaw ) # Wrist yaw
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self): def publish_feedback(self):
"""TODO: Old"""
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
@deprecated("Uses an old message type. Will be removed at some point.")
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: str):
"""TODO: Old"""
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas # split the msg.data by commas
parts = msg.split(",") parts = msg.split(",")
@@ -422,8 +413,8 @@ class ArmNode(Node):
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateBusVoltage(self, msg: str): def updateBusVoltage(self, msg: str):
"""TODO: Old"""
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -437,8 +428,8 @@ class ArmNode(Node):
else: else:
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateMotorFeedback(self, msg: str): def updateMotorFeedback(self, msg: str):
"""TODO: Old"""
parts = str(msg.strip()).split(",") parts = str(msg.strip()).split(",")
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0 temp = float(parts[4]) / 10.0
@@ -462,15 +453,22 @@ class ArmNode(Node):
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
arm_node = ArmNode() arm_node = ArmNode()
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
thread.start()
try: try:
thread.join() rclpy.spin(arm_node)
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
pass pass
finally: finally:
@@ -478,8 +476,4 @@ def main(args=None):
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -1,27 +1,22 @@
from setuptools import find_packages, setup from setuptools import setup
import os
from glob import glob
package_name = "arm_pkg" package_name = "arm_pkg"
setup( setup(
name=package_name, name=package_name,
version="1.0.0", version="1.0.0",
packages=find_packages(exclude=["test"]), packages=[package_name],
data_files=[ data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]), ("share/" + package_name, ["package.xml"]),
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer="tristan", maintainer="David",
maintainer_email="tristanmcginnis26@gmail.com", maintainer_email="ds0196@uah.edu",
description="TODO: Package description", description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
license="All Rights Reserved", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": [ "console_scripts": ["arm = arm_pkg.arm_node:main"],
"arm = arm_pkg.arm_node:main",
"headless = arm_pkg.arm_headless:main",
],
}, },
) )

View File

@@ -39,9 +39,9 @@ os.environ["SDL_AUDIODRIVER"] = (
CORE_STOP_MSG = CoreControl() # All zeros by default CORE_STOP_MSG = CoreControl() # All zeros by default
CORE_STOP_TWIST_MSG = Twist() # " CORE_STOP_TWIST_MSG = Twist() # "
ARM_STOP_MSG = ArmManual() # " ARM_STOP_MSG = ArmManual() # "
ARM_IK_STOP_MSG = TwistStamped() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
@@ -75,15 +75,7 @@ class Headless(Node):
# Initialize pygame first # Initialize pygame first
pygame.init() pygame.init()
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless") super().__init__("headless_node")
# TODO: move the STOP_MSGs somewhere better
global ARM_STOP_JOG_MSG
ARM_STOP_JOG_MSG = JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
################################################## ##################################################
# Preamble # Preamble
@@ -247,9 +239,9 @@ class Headless(Node):
else: else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG) self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik: if self.use_arm_ik:
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG) self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
else: else:
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG) self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
# TODO: add bio here after implementing new topics # TODO: add bio here after implementing new topics
def send_controls(self): def send_controls(self):
@@ -519,14 +511,12 @@ class Headless(Node):
# X: _ # X: _
# Y: linear actuator out # Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3 # Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float( arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
stick_to_arm_direction(right_stick_x) stick_to_arm_direction(right_stick_x)
) )
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float( arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
-1 * stick_to_arm_direction(right_stick_y) stick_to_arm_direction(right_stick_y)
) )
# Left stick: axis 1 and 2 # Left stick: axis 1 and 2
@@ -534,7 +524,7 @@ class Headless(Node):
stick_to_arm_direction(left_stick_x) stick_to_arm_direction(left_stick_x)
) )
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float( arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
-1 * stick_to_arm_direction(left_stick_y) stick_to_arm_direction(left_stick_y)
) )
# D-pad: axis 0 and _ # D-pad: axis 0 and _
@@ -653,6 +643,18 @@ class Headless(Node):
else: else:
pass # TODO: implement new bio control topics pass # TODO: implement new bio control topics
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
def arm_ik_twist_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float: def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""