mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 11:51:16 -05:00
Compare commits
3 Commits
743744edaa
...
67b3c5bc8f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67b3c5bc8f | ||
|
|
c506a34b37 | ||
|
|
980c08ba4f |
@@ -1,279 +0,0 @@
|
|||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
|
|
||||||
import pygame
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
import serial
|
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
import glob
|
|
||||||
import os
|
|
||||||
|
|
||||||
from std_msgs.msg import String
|
|
||||||
from astra_msgs.msg import ControllerState
|
|
||||||
from astra_msgs.msg import ArmManual
|
|
||||||
from astra_msgs.msg import ArmIK
|
|
||||||
|
|
||||||
|
|
||||||
os.environ["SDL_AUDIODRIVER"] = (
|
|
||||||
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
|
|
||||||
)
|
|
||||||
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
|
|
||||||
|
|
||||||
|
|
||||||
class Headless(Node):
|
|
||||||
def __init__(self):
|
|
||||||
# Initalize node with name
|
|
||||||
super().__init__("arm_headless")
|
|
||||||
|
|
||||||
# Depricated, kept temporarily for reference
|
|
||||||
# self.create_timer(0.20, self.send_controls)#read and send controls
|
|
||||||
|
|
||||||
self.create_timer(0.1, self.send_manual)
|
|
||||||
|
|
||||||
# Create a publisher to publish any output the pico sends
|
|
||||||
|
|
||||||
# Depricated, kept temporarily for reference
|
|
||||||
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
|
|
||||||
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
|
|
||||||
|
|
||||||
# Create a subscriber to listen to any commands sent for the pico
|
|
||||||
|
|
||||||
# Depricated, kept temporarily for reference
|
|
||||||
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
|
|
||||||
|
|
||||||
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
|
|
||||||
|
|
||||||
self.laser_status = 0
|
|
||||||
|
|
||||||
# Initialize pygame
|
|
||||||
pygame.init()
|
|
||||||
|
|
||||||
# Initialize the gamepad module
|
|
||||||
pygame.joystick.init()
|
|
||||||
|
|
||||||
# Check if any gamepad is connected
|
|
||||||
if pygame.joystick.get_count() == 0:
|
|
||||||
print("No gamepad found.")
|
|
||||||
pygame.quit()
|
|
||||||
exit()
|
|
||||||
for event in pygame.event.get():
|
|
||||||
if event.type == pygame.QUIT:
|
|
||||||
pygame.quit()
|
|
||||||
exit()
|
|
||||||
|
|
||||||
# Initialize the first gamepad, print name to terminal
|
|
||||||
self.gamepad = pygame.joystick.Joystick(0)
|
|
||||||
self.gamepad.init()
|
|
||||||
print(f"Gamepad Found: {self.gamepad.get_name()}")
|
|
||||||
#
|
|
||||||
#
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
# This thread makes all the update processes run in the background
|
|
||||||
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
try:
|
|
||||||
while rclpy.ok():
|
|
||||||
# Check the pico for updates
|
|
||||||
|
|
||||||
# self.read_feedback()
|
|
||||||
if (
|
|
||||||
pygame.joystick.get_count() == 0
|
|
||||||
): # if controller disconnected, wait for it to be reconnected
|
|
||||||
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
|
|
||||||
|
|
||||||
while pygame.joystick.get_count() == 0:
|
|
||||||
# self.send_controls() #depricated, kept for reference temporarily
|
|
||||||
self.send_manual()
|
|
||||||
# self.read_feedback()
|
|
||||||
self.gamepad = pygame.joystick.Joystick(0)
|
|
||||||
self.gamepad.init() # re-initialized gamepad
|
|
||||||
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
def send_manual(self):
|
|
||||||
for event in pygame.event.get():
|
|
||||||
if event.type == pygame.QUIT:
|
|
||||||
pygame.quit()
|
|
||||||
exit()
|
|
||||||
input = ArmManual()
|
|
||||||
|
|
||||||
# Triggers for gripper control
|
|
||||||
if self.gamepad.get_axis(2) > 0: # left trigger
|
|
||||||
input.gripper = -1
|
|
||||||
elif self.gamepad.get_axis(5) > 0: # right trigger
|
|
||||||
input.gripper = 1
|
|
||||||
|
|
||||||
# Toggle Laser
|
|
||||||
if self.gamepad.get_button(7): # Start
|
|
||||||
self.laser_status = 1
|
|
||||||
elif self.gamepad.get_button(6): # Back
|
|
||||||
self.laser_status = 0
|
|
||||||
input.laser = self.laser_status
|
|
||||||
|
|
||||||
if self.gamepad.get_button(5): # right bumper, control effector
|
|
||||||
|
|
||||||
# Left stick X-axis for effector yaw
|
|
||||||
if self.gamepad.get_axis(0) > 0:
|
|
||||||
input.effector_yaw = 1
|
|
||||||
elif self.gamepad.get_axis(0) < 0:
|
|
||||||
input.effector_yaw = -1
|
|
||||||
|
|
||||||
# Right stick X-axis for effector roll
|
|
||||||
if self.gamepad.get_axis(3) > 0:
|
|
||||||
input.effector_roll = 1
|
|
||||||
elif self.gamepad.get_axis(3) < 0:
|
|
||||||
input.effector_roll = -1
|
|
||||||
|
|
||||||
else: # Control arm axis
|
|
||||||
dpad_input = self.gamepad.get_hat(0)
|
|
||||||
input.axis0 = 0
|
|
||||||
if dpad_input[0] == 1:
|
|
||||||
input.axis0 = 1
|
|
||||||
elif dpad_input[0] == -1:
|
|
||||||
input.axis0 = -1
|
|
||||||
|
|
||||||
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
|
|
||||||
input.axis1 = round(self.gamepad.get_axis(0))
|
|
||||||
|
|
||||||
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
|
|
||||||
input.axis2 = -1 * round(self.gamepad.get_axis(1))
|
|
||||||
|
|
||||||
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
|
|
||||||
input.axis3 = -1 * round(self.gamepad.get_axis(4))
|
|
||||||
|
|
||||||
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
|
|
||||||
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
|
|
||||||
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
|
|
||||||
|
|
||||||
# Button Mappings
|
|
||||||
# axis2 -> LT
|
|
||||||
# axis5 -> RT
|
|
||||||
# Buttons0 -> A
|
|
||||||
# Buttons1 -> B
|
|
||||||
# Buttons2 -> X
|
|
||||||
# Buttons3 -> Y
|
|
||||||
# Buttons4 -> LB
|
|
||||||
# Buttons5 -> RB
|
|
||||||
# Buttons6 -> Back
|
|
||||||
# Buttons7 -> Start
|
|
||||||
|
|
||||||
input.linear_actuator = 0
|
|
||||||
|
|
||||||
if pygame.joystick.get_count() != 0:
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
|
|
||||||
)
|
|
||||||
self.manual_pub.publish(input)
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Depricated, kept temporarily for reference
|
|
||||||
# def send_controls(self):
|
|
||||||
|
|
||||||
# for event in pygame.event.get():
|
|
||||||
# if event.type == pygame.QUIT:
|
|
||||||
# pygame.quit()
|
|
||||||
# exit()
|
|
||||||
# input = ControllerState()
|
|
||||||
|
|
||||||
# input.lt = self.gamepad.get_axis(2)#left trigger
|
|
||||||
# input.rt = self.gamepad.get_axis(5)#right trigger
|
|
||||||
|
|
||||||
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
|
|
||||||
# if(self.gamepad.get_button(4)):#left bumper
|
|
||||||
# input.lb = True
|
|
||||||
# else:
|
|
||||||
# input.lb = False
|
|
||||||
|
|
||||||
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
|
|
||||||
# if(self.gamepad.get_button(5)):#right bumper
|
|
||||||
# input.rb = True
|
|
||||||
# else:
|
|
||||||
# input.rb = False
|
|
||||||
|
|
||||||
# #input.plus = self.gamepad.get_button(6)#plus button
|
|
||||||
# if(self.gamepad.get_button(7)):#plus button
|
|
||||||
# input.plus = True
|
|
||||||
# else:
|
|
||||||
# input.plus = False
|
|
||||||
|
|
||||||
# #input.minus = self.gamepad.get_button(4)#minus button
|
|
||||||
# if(self.gamepad.get_button(6)):#minus button
|
|
||||||
# input.minus = True
|
|
||||||
# else:
|
|
||||||
# input.minus = False
|
|
||||||
|
|
||||||
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
|
|
||||||
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
|
|
||||||
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
|
|
||||||
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
|
|
||||||
|
|
||||||
# #input.a = self.gamepad.get_button(1)#A button
|
|
||||||
# if(self.gamepad.get_button(0)):#A button
|
|
||||||
# input.a = True
|
|
||||||
# else:
|
|
||||||
# input.a = False
|
|
||||||
# #input.b = self.gamepad.get_button(0)#B button
|
|
||||||
# if(self.gamepad.get_button(1)):#B button
|
|
||||||
# input.b = True
|
|
||||||
# else:
|
|
||||||
# input.b = False
|
|
||||||
# #input.x = self.gamepad.get_button(3)#X button
|
|
||||||
# if(self.gamepad.get_button(2)):#X button
|
|
||||||
# input.x = True
|
|
||||||
# else:
|
|
||||||
# input.x = False
|
|
||||||
# #input.y = self.gamepad.get_button(2)#Y button
|
|
||||||
# if(self.gamepad.get_button(3)):#Y button
|
|
||||||
# input.y = True
|
|
||||||
# else:
|
|
||||||
# input.y = False
|
|
||||||
|
|
||||||
# dpad_input = self.gamepad.get_hat(0)#D-pad input
|
|
||||||
|
|
||||||
# #not using up/down on DPad
|
|
||||||
# input.d_up = False
|
|
||||||
# input.d_down = False
|
|
||||||
|
|
||||||
# if(dpad_input[0] == 1):#D-pad right
|
|
||||||
# input.d_right = True
|
|
||||||
# else:
|
|
||||||
# input.d_right = False
|
|
||||||
# if(dpad_input[0] == -1):#D-pad left
|
|
||||||
# input.d_left = True
|
|
||||||
# else:
|
|
||||||
# input.d_left = False
|
|
||||||
|
|
||||||
# if pygame.joystick.get_count() != 0:
|
|
||||||
|
|
||||||
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
|
|
||||||
|
|
||||||
# self.publisher.publish(input)
|
|
||||||
# else:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
rclpy.init(args=args)
|
|
||||||
|
|
||||||
node = Headless()
|
|
||||||
|
|
||||||
rclpy.spin(node)
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
# tb_bs = BaseStation()
|
|
||||||
# node.run()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -2,6 +2,7 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
import math
|
import math
|
||||||
|
from warnings import deprecated
|
||||||
|
|
||||||
import rclpy
|
import rclpy
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
@@ -11,7 +12,7 @@ from rclpy import qos
|
|||||||
from std_msgs.msg import String, Header
|
from std_msgs.msg import String, Header
|
||||||
from sensor_msgs.msg import JointState
|
from sensor_msgs.msg import JointState
|
||||||
from control_msgs.msg import JointJog
|
from control_msgs.msg import JointJog
|
||||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
|
||||||
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
||||||
|
|
||||||
control_qos = qos.QoSProfile(
|
control_qos = qos.QoSProfile(
|
||||||
@@ -68,7 +69,9 @@ class ArmNode(Node):
|
|||||||
# Parameters
|
# Parameters
|
||||||
|
|
||||||
self.declare_parameter("use_old_topics", True)
|
self.declare_parameter("use_old_topics", True)
|
||||||
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
self.use_old_topics = (
|
||||||
|
self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||||
|
)
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Old topics
|
# Old topics
|
||||||
@@ -85,7 +88,9 @@ class ArmNode(Node):
|
|||||||
SocketFeedback, "/arm/feedback/socket", 10
|
SocketFeedback, "/arm/feedback/socket", 10
|
||||||
)
|
)
|
||||||
self.arm_feedback = SocketFeedback()
|
self.arm_feedback = SocketFeedback()
|
||||||
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
self.digit_pub = self.create_publisher(
|
||||||
|
DigitFeedback, "/arm/feedback/digit", 10
|
||||||
|
)
|
||||||
self.digit_feedback = DigitFeedback()
|
self.digit_feedback = DigitFeedback()
|
||||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||||
|
|
||||||
@@ -111,12 +116,18 @@ class ArmNode(Node):
|
|||||||
# Control
|
# Control
|
||||||
|
|
||||||
# Manual: /arm/manual_new is published by Servo or Basestation
|
# Manual: /arm/manual_new is published by Servo or Basestation
|
||||||
self.jointjog_pub_ = self.create_subscription(
|
self.man_jointjog_pub_ = self.create_subscription(
|
||||||
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
|
JointJog,
|
||||||
|
"/arm/manual/joint_jog",
|
||||||
|
self.jointjog_callback,
|
||||||
|
qos_profile=control_qos,
|
||||||
)
|
)
|
||||||
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
||||||
self.joint_command_sub_ = self.create_subscription(
|
self.joint_command_sub_ = self.create_subscription(
|
||||||
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
|
JointState,
|
||||||
|
"/joint_commands",
|
||||||
|
self.joint_command_callback,
|
||||||
|
qos_profile=control_qos,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Feedback
|
# Feedback
|
||||||
@@ -124,12 +135,12 @@ class ArmNode(Node):
|
|||||||
# Combined Socket and Digit feedback
|
# Combined Socket and Digit feedback
|
||||||
self.arm_feedback_pub_ = self.create_publisher(
|
self.arm_feedback_pub_ = self.create_publisher(
|
||||||
ArmFeedback,
|
ArmFeedback,
|
||||||
"/arm/feedback/new_feedback",
|
"/arm/feedback",
|
||||||
qos_profile=qos.qos_profile_sensor_data,
|
qos_profile=qos.qos_profile_sensor_data,
|
||||||
)
|
)
|
||||||
# IK arm pose: /joint_states is published from here to topic_based_control
|
# IK arm pose: /joint_states is published from here to topic_based_control
|
||||||
self.joint_state_pub_ = self.create_publisher(
|
self.joint_state_pub_ = self.create_publisher(
|
||||||
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
|
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
|
||||||
)
|
)
|
||||||
|
|
||||||
###################################################
|
###################################################
|
||||||
@@ -146,12 +157,9 @@ class ArmNode(Node):
|
|||||||
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
|
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
|
||||||
|
|
||||||
def jointjog_callback(self, msg: JointJog):
|
def jointjog_callback(self, msg: JointJog):
|
||||||
if (
|
if len(msg.joint_names) != len(msg.velocities):
|
||||||
len(msg.joint_names) == 0
|
self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
|
||||||
or len(msg.velocities) == 0
|
return
|
||||||
or len(msg.joint_names) != len(msg.velocities)
|
|
||||||
):
|
|
||||||
return # Malformed message
|
|
||||||
|
|
||||||
# Grab velocities from message
|
# Grab velocities from message
|
||||||
velocities = [
|
velocities = [
|
||||||
@@ -164,62 +172,51 @@ class ArmNode(Node):
|
|||||||
]
|
]
|
||||||
# Deadzone
|
# Deadzone
|
||||||
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
|
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
|
||||||
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
|
|
||||||
velocities = [
|
|
||||||
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
|
|
||||||
]
|
|
||||||
# Axis 2 & 3 URDF direction is inverted
|
|
||||||
velocities[2] = -velocities[2]
|
|
||||||
velocities[3] = -velocities[3]
|
|
||||||
|
|
||||||
# Send Axis 0-3
|
self.send_velocities(velocities, msg.header)
|
||||||
self.anchor_tovic_pub_.publish(
|
|
||||||
VicCAN(
|
|
||||||
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# Send Wrist yaw and roll
|
|
||||||
# TODO: Verify embedded
|
|
||||||
self.anchor_tovic_pub_.publish(
|
|
||||||
VicCAN(
|
|
||||||
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# Send End Effector Gripper
|
|
||||||
# TODO: Verify m/s received correctly by embedded
|
|
||||||
self.anchor_tovic_pub_.publish(
|
|
||||||
VicCAN(
|
|
||||||
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# TODO: use msg.duration
|
# TODO: use msg.duration
|
||||||
|
|
||||||
def joint_command_callback(self, msg: JointState):
|
def joint_command_callback(self, msg: JointState):
|
||||||
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
||||||
|
self.get_logger().debug("Ignoring malformed /joint_command message.")
|
||||||
return # command needs either position or velocity for all 7 joints
|
return # command needs either position or velocity for all 7 joints
|
||||||
|
|
||||||
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
|
# Grab velocities from message
|
||||||
# TODO: refactor to depend on joint names
|
|
||||||
# Embedded takes deg*10, ROS2 uses Radians
|
|
||||||
velocities = [
|
velocities = [
|
||||||
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
|
(
|
||||||
|
msg.velocity[msg.name.index(joint_name)] # type: ignore
|
||||||
|
if joint_name in msg.name
|
||||||
|
else 0.0
|
||||||
|
)
|
||||||
|
for joint_name in self.all_joint_names
|
||||||
]
|
]
|
||||||
# Axis 2 & 3 URDF direction is inverted
|
|
||||||
velocities[2] = -velocities[2]
|
|
||||||
velocities[3] = -velocities[3]
|
|
||||||
|
|
||||||
# Axis 0-3
|
self.send_velocities(velocities, msg.header)
|
||||||
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3])
|
|
||||||
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg())
|
|
||||||
# Wrist yaw and roll, gripper included for future use when have adequate hardware
|
|
||||||
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6])
|
|
||||||
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg())
|
|
||||||
|
|
||||||
self.anchor_tovic_pub_.publish(arm_cmd)
|
def send_velocities(self, velocities: list[float], header: Header):
|
||||||
self.anchor_tovic_pub_.publish(digit_cmd)
|
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
|
||||||
|
velocities = [
|
||||||
|
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Send Axis 0-3
|
||||||
|
self.anchor_tovic_pub_.publish(
|
||||||
|
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
|
||||||
|
)
|
||||||
|
# Send Wrist yaw and roll
|
||||||
|
# TODO: Verify embedded
|
||||||
|
self.anchor_tovic_pub_.publish(
|
||||||
|
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
|
||||||
|
)
|
||||||
|
# Send End Effector Gripper
|
||||||
|
# TODO: Verify m/s received correctly by embedded
|
||||||
|
self.anchor_tovic_pub_.publish(
|
||||||
|
VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header)
|
||||||
|
)
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def send_manual(self, msg: ArmManual):
|
def send_manual(self, msg: ArmManual):
|
||||||
"""TODO: Old"""
|
|
||||||
axis0 = msg.axis0
|
axis0 = msg.axis0
|
||||||
axis1 = -1 * msg.axis1
|
axis1 = -1 * msg.axis1
|
||||||
axis2 = msg.axis2
|
axis2 = msg.axis2
|
||||||
@@ -243,13 +240,13 @@ class ArmNode(Node):
|
|||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def send_cmd(self, msg: str):
|
def send_cmd(self, msg: str):
|
||||||
"""TODO: Old"""
|
|
||||||
output = String(data=msg)
|
output = String(data=msg)
|
||||||
self.anchor_pub.publish(output)
|
self.anchor_pub.publish(output)
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def anchor_feedback(self, msg: String):
|
def anchor_feedback(self, msg: String):
|
||||||
"""TODO: Old"""
|
|
||||||
output = msg.data
|
output = msg.data
|
||||||
if output.startswith("can_relay_fromvic,arm,55"):
|
if output.startswith("can_relay_fromvic,arm,55"):
|
||||||
self.updateAngleFeedback(output)
|
self.updateAngleFeedback(output)
|
||||||
@@ -282,8 +279,7 @@ class ArmNode(Node):
|
|||||||
self.process_fromvic_digit(msg)
|
self.process_fromvic_digit(msg)
|
||||||
|
|
||||||
def process_fromvic_arm(self, msg: VicCAN):
|
def process_fromvic_arm(self, msg: VicCAN):
|
||||||
if msg.mcu_name != "arm":
|
assert msg.mcu_name == "arm"
|
||||||
return
|
|
||||||
|
|
||||||
# Check message len to prevent crashing on bad data
|
# Check message len to prevent crashing on bad data
|
||||||
if msg.command_id in self.viccan_socket_msg_len_dict:
|
if msg.command_id in self.viccan_socket_msg_len_dict:
|
||||||
@@ -326,12 +322,8 @@ class ArmNode(Node):
|
|||||||
# Joint state publisher for URDF visualization
|
# Joint state publisher for URDF visualization
|
||||||
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
|
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
|
||||||
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
|
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
|
||||||
self.saved_joint_state.position[2] = math.radians(
|
self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
|
||||||
-angles[2]
|
self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
|
||||||
) # Axis 2 (inverted)
|
|
||||||
self.saved_joint_state.position[3] = math.radians(
|
|
||||||
-angles[3]
|
|
||||||
) # Axis 3 (inverted)
|
|
||||||
# Wrist is handled by digit feedback
|
# Wrist is handled by digit feedback
|
||||||
self.saved_joint_state.header.stamp = msg.header.stamp
|
self.saved_joint_state.header.stamp = msg.header.stamp
|
||||||
self.joint_state_pub_.publish(self.saved_joint_state)
|
self.joint_state_pub_.publish(self.saved_joint_state)
|
||||||
@@ -363,18 +355,17 @@ class ArmNode(Node):
|
|||||||
velocities[1]
|
velocities[1]
|
||||||
) # Axis 1
|
) # Axis 1
|
||||||
self.saved_joint_state.velocity[2] = math.radians(
|
self.saved_joint_state.velocity[2] = math.radians(
|
||||||
-velocities[2]
|
velocities[2]
|
||||||
) # Axis 2 (-)
|
) # Axis 2
|
||||||
self.saved_joint_state.velocity[3] = math.radians(
|
self.saved_joint_state.velocity[3] = math.radians(
|
||||||
-velocities[3]
|
velocities[3]
|
||||||
) # Axis 3 (-)
|
) # Axis 3
|
||||||
# Wrist is handled by digit feedback
|
# Wrist is handled by digit feedback
|
||||||
self.saved_joint_state.header.stamp = msg.header.stamp
|
self.saved_joint_state.header.stamp = msg.header.stamp
|
||||||
self.joint_state_pub_.publish(self.saved_joint_state)
|
self.joint_state_pub_.publish(self.saved_joint_state)
|
||||||
|
|
||||||
def process_fromvic_digit(self, msg: VicCAN):
|
def process_fromvic_digit(self, msg: VicCAN):
|
||||||
if msg.mcu_name != "digit":
|
assert msg.mcu_name == "digit"
|
||||||
return
|
|
||||||
|
|
||||||
# Check message len to prevent crashing on bad data
|
# Check message len to prevent crashing on bad data
|
||||||
if msg.command_id in self.viccan_digit_msg_len_dict:
|
if msg.command_id in self.viccan_digit_msg_len_dict:
|
||||||
@@ -398,13 +389,13 @@ class ArmNode(Node):
|
|||||||
msg.data[1]
|
msg.data[1]
|
||||||
) # Wrist yaw
|
) # Wrist yaw
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def publish_feedback(self):
|
def publish_feedback(self):
|
||||||
"""TODO: Old"""
|
|
||||||
self.socket_pub.publish(self.arm_feedback)
|
self.socket_pub.publish(self.arm_feedback)
|
||||||
self.digit_pub.publish(self.digit_feedback)
|
self.digit_pub.publish(self.digit_feedback)
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def updateAngleFeedback(self, msg: str):
|
def updateAngleFeedback(self, msg: str):
|
||||||
"""TODO: Old"""
|
|
||||||
# Angle feedbacks,
|
# Angle feedbacks,
|
||||||
# split the msg.data by commas
|
# split the msg.data by commas
|
||||||
parts = msg.split(",")
|
parts = msg.split(",")
|
||||||
@@ -422,8 +413,8 @@ class ArmNode(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().info("Invalid angle feedback input format")
|
self.get_logger().info("Invalid angle feedback input format")
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def updateBusVoltage(self, msg: str):
|
def updateBusVoltage(self, msg: str):
|
||||||
"""TODO: Old"""
|
|
||||||
# Bus Voltage feedbacks
|
# Bus Voltage feedbacks
|
||||||
parts = msg.split(",")
|
parts = msg.split(",")
|
||||||
if len(parts) >= 7:
|
if len(parts) >= 7:
|
||||||
@@ -437,8 +428,8 @@ class ArmNode(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().info("Invalid voltage feedback input format")
|
self.get_logger().info("Invalid voltage feedback input format")
|
||||||
|
|
||||||
|
@deprecated("Uses an old message type. Will be removed at some point.")
|
||||||
def updateMotorFeedback(self, msg: str):
|
def updateMotorFeedback(self, msg: str):
|
||||||
"""TODO: Old"""
|
|
||||||
parts = str(msg.strip()).split(",")
|
parts = str(msg.strip()).split(",")
|
||||||
motorId = round(float(parts[3]))
|
motorId = round(float(parts[3]))
|
||||||
temp = float(parts[4]) / 10.0
|
temp = float(parts[4]) / 10.0
|
||||||
@@ -462,15 +453,22 @@ class ArmNode(Node):
|
|||||||
self.arm_feedback.axis0_current = current
|
self.arm_feedback.axis0_current = current
|
||||||
|
|
||||||
|
|
||||||
|
def exit_handler(signum, frame):
|
||||||
|
print("Caught SIGTERM. Exiting...")
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
|
|
||||||
|
# Catch termination signals and exit cleanly
|
||||||
|
signal.signal(signal.SIGTERM, exit_handler)
|
||||||
|
|
||||||
arm_node = ArmNode()
|
arm_node = ArmNode()
|
||||||
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
thread.join()
|
rclpy.spin(arm_node)
|
||||||
except (KeyboardInterrupt, ExternalShutdownException):
|
except (KeyboardInterrupt, ExternalShutdownException):
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
@@ -478,8 +476,4 @@ def main(args=None):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
|
||||||
signal.signal(
|
|
||||||
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
|
||||||
) # Catch termination signals and exit cleanly
|
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -1,27 +1,22 @@
|
|||||||
from setuptools import find_packages, setup
|
from setuptools import setup
|
||||||
import os
|
|
||||||
from glob import glob
|
|
||||||
|
|
||||||
package_name = "arm_pkg"
|
package_name = "arm_pkg"
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name=package_name,
|
name=package_name,
|
||||||
version="1.0.0",
|
version="1.0.0",
|
||||||
packages=find_packages(exclude=["test"]),
|
packages=[package_name],
|
||||||
data_files=[
|
data_files=[
|
||||||
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
||||||
("share/" + package_name, ["package.xml"]),
|
("share/" + package_name, ["package.xml"]),
|
||||||
],
|
],
|
||||||
install_requires=["setuptools"],
|
install_requires=["setuptools"],
|
||||||
zip_safe=True,
|
zip_safe=True,
|
||||||
maintainer="tristan",
|
maintainer="David",
|
||||||
maintainer_email="tristanmcginnis26@gmail.com",
|
maintainer_email="ds0196@uah.edu",
|
||||||
description="TODO: Package description",
|
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
|
||||||
license="All Rights Reserved",
|
license="AGPL-3.0-only",
|
||||||
entry_points={
|
entry_points={
|
||||||
"console_scripts": [
|
"console_scripts": ["arm = arm_pkg.arm_node:main"],
|
||||||
"arm = arm_pkg.arm_node:main",
|
|
||||||
"headless = arm_pkg.arm_headless:main",
|
|
||||||
],
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
Submodule src/astra_descriptions updated: 72d3c223af...c36bd8233d
@@ -39,9 +39,9 @@ os.environ["SDL_AUDIODRIVER"] = (
|
|||||||
CORE_STOP_MSG = CoreControl() # All zeros by default
|
CORE_STOP_MSG = CoreControl() # All zeros by default
|
||||||
CORE_STOP_TWIST_MSG = Twist() # "
|
CORE_STOP_TWIST_MSG = Twist() # "
|
||||||
ARM_STOP_MSG = ArmManual() # "
|
ARM_STOP_MSG = ArmManual() # "
|
||||||
ARM_IK_STOP_MSG = TwistStamped() # "
|
|
||||||
BIO_STOP_MSG = BioControl() # "
|
BIO_STOP_MSG = BioControl() # "
|
||||||
|
|
||||||
|
|
||||||
control_qos = qos.QoSProfile(
|
control_qos = qos.QoSProfile(
|
||||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||||
depth=2,
|
depth=2,
|
||||||
@@ -75,15 +75,7 @@ class Headless(Node):
|
|||||||
# Initialize pygame first
|
# Initialize pygame first
|
||||||
pygame.init()
|
pygame.init()
|
||||||
pygame.joystick.init()
|
pygame.joystick.init()
|
||||||
super().__init__("headless")
|
super().__init__("headless_node")
|
||||||
|
|
||||||
# TODO: move the STOP_MSGs somewhere better
|
|
||||||
global ARM_STOP_JOG_MSG
|
|
||||||
ARM_STOP_JOG_MSG = JointJog(
|
|
||||||
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
|
|
||||||
joint_names=self.all_joint_names,
|
|
||||||
velocities=[0.0] * len(self.all_joint_names),
|
|
||||||
)
|
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Preamble
|
# Preamble
|
||||||
@@ -247,9 +239,9 @@ class Headless(Node):
|
|||||||
else:
|
else:
|
||||||
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
||||||
if self.use_arm_ik:
|
if self.use_arm_ik:
|
||||||
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG)
|
self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
|
||||||
else:
|
else:
|
||||||
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG)
|
self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
|
||||||
# TODO: add bio here after implementing new topics
|
# TODO: add bio here after implementing new topics
|
||||||
|
|
||||||
def send_controls(self):
|
def send_controls(self):
|
||||||
@@ -519,14 +511,12 @@ class Headless(Node):
|
|||||||
# X: _
|
# X: _
|
||||||
# Y: linear actuator out
|
# Y: linear actuator out
|
||||||
|
|
||||||
ARM_THRESHOLD = 0.2
|
|
||||||
|
|
||||||
# Right stick: EF yaw and axis 3
|
# Right stick: EF yaw and axis 3
|
||||||
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
|
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
|
||||||
stick_to_arm_direction(right_stick_x)
|
stick_to_arm_direction(right_stick_x)
|
||||||
)
|
)
|
||||||
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
|
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
|
||||||
-1 * stick_to_arm_direction(right_stick_y)
|
stick_to_arm_direction(right_stick_y)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Left stick: axis 1 and 2
|
# Left stick: axis 1 and 2
|
||||||
@@ -534,7 +524,7 @@ class Headless(Node):
|
|||||||
stick_to_arm_direction(left_stick_x)
|
stick_to_arm_direction(left_stick_x)
|
||||||
)
|
)
|
||||||
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
|
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
|
||||||
-1 * stick_to_arm_direction(left_stick_y)
|
stick_to_arm_direction(left_stick_y)
|
||||||
)
|
)
|
||||||
|
|
||||||
# D-pad: axis 0 and _
|
# D-pad: axis 0 and _
|
||||||
@@ -653,6 +643,18 @@ class Headless(Node):
|
|||||||
else:
|
else:
|
||||||
pass # TODO: implement new bio control topics
|
pass # TODO: implement new bio control topics
|
||||||
|
|
||||||
|
def arm_manual_stop_msg(self):
|
||||||
|
return JointJog(
|
||||||
|
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
|
||||||
|
joint_names=self.all_joint_names,
|
||||||
|
velocities=[0.0] * len(self.all_joint_names),
|
||||||
|
)
|
||||||
|
|
||||||
|
def arm_ik_twist_stop_msg(self):
|
||||||
|
return TwistStamped(
|
||||||
|
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||||
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
||||||
|
|||||||
Reference in New Issue
Block a user