mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
remove ikpy (and reformat the code files)
This commit is contained in:
@@ -17,9 +17,12 @@ from ros2_interfaces_pkg.msg import ArmManual
|
||||
from ros2_interfaces_pkg.msg import ArmIK
|
||||
|
||||
|
||||
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
|
||||
os.environ["SDL_AUDIODRIVER"] = (
|
||||
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
|
||||
)
|
||||
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
|
||||
|
||||
|
||||
class Headless(Node):
|
||||
def __init__(self):
|
||||
# Initalize node with name
|
||||
@@ -30,20 +33,18 @@ class Headless(Node):
|
||||
|
||||
self.create_timer(0.1, self.send_manual)
|
||||
|
||||
|
||||
# Create a publisher to publish any output the pico sends
|
||||
|
||||
# Depricated, kept temporarily for reference
|
||||
#self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
|
||||
self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10)
|
||||
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
|
||||
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
|
||||
|
||||
# Create a subscriber to listen to any commands sent for the pico
|
||||
|
||||
# Depricated, kept temporarily for reference
|
||||
#self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
|
||||
|
||||
#self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
|
||||
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
|
||||
|
||||
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
|
||||
|
||||
self.laser_status = 0
|
||||
|
||||
@@ -66,38 +67,36 @@ class Headless(Node):
|
||||
# Initialize the first gamepad, print name to terminal
|
||||
self.gamepad = pygame.joystick.Joystick(0)
|
||||
self.gamepad.init()
|
||||
print(f'Gamepad Found: {self.gamepad.get_name()}')
|
||||
print(f"Gamepad Found: {self.gamepad.get_name()}")
|
||||
#
|
||||
#
|
||||
|
||||
|
||||
def run(self):
|
||||
# This thread makes all the update processes run in the background
|
||||
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
|
||||
thread.start()
|
||||
|
||||
|
||||
try:
|
||||
while rclpy.ok():
|
||||
#Check the pico for updates
|
||||
# Check the pico for updates
|
||||
|
||||
#self.read_feedback()
|
||||
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
|
||||
# self.read_feedback()
|
||||
if (
|
||||
pygame.joystick.get_count() == 0
|
||||
): # if controller disconnected, wait for it to be reconnected
|
||||
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
|
||||
|
||||
while pygame.joystick.get_count() == 0:
|
||||
#self.send_controls() #depricated, kept for reference temporarily
|
||||
# self.send_controls() #depricated, kept for reference temporarily
|
||||
self.send_manual()
|
||||
#self.read_feedback()
|
||||
# self.read_feedback()
|
||||
self.gamepad = pygame.joystick.Joystick(0)
|
||||
self.gamepad.init() #re-initialized gamepad
|
||||
self.gamepad.init() # re-initialized gamepad
|
||||
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def send_manual(self):
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
@@ -105,21 +104,20 @@ class Headless(Node):
|
||||
exit()
|
||||
input = ArmManual()
|
||||
|
||||
|
||||
# Triggers for gripper control
|
||||
if self.gamepad.get_axis(2) > 0:#left trigger
|
||||
if self.gamepad.get_axis(2) > 0: # left trigger
|
||||
input.gripper = -1
|
||||
elif self.gamepad.get_axis(5) > 0:#right trigger
|
||||
elif self.gamepad.get_axis(5) > 0: # right trigger
|
||||
input.gripper = 1
|
||||
|
||||
# Toggle Laser
|
||||
if self.gamepad.get_button(7):#Start
|
||||
if self.gamepad.get_button(7): # Start
|
||||
self.laser_status = 1
|
||||
elif self.gamepad.get_button(6):#Back
|
||||
elif self.gamepad.get_button(6): # Back
|
||||
self.laser_status = 0
|
||||
input.laser = self.laser_status
|
||||
|
||||
if self.gamepad.get_button(5):#right bumper, control effector
|
||||
if self.gamepad.get_button(5): # right bumper, control effector
|
||||
|
||||
# Left stick X-axis for effector yaw
|
||||
if self.gamepad.get_axis(0) > 0:
|
||||
@@ -141,38 +139,38 @@ class Headless(Node):
|
||||
elif dpad_input[0] == -1:
|
||||
input.axis0 = -1
|
||||
|
||||
if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
|
||||
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
|
||||
input.axis1 = round(self.gamepad.get_axis(0))
|
||||
|
||||
if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
|
||||
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
|
||||
input.axis2 = -1 * round(self.gamepad.get_axis(1))
|
||||
|
||||
if self.gamepad.get_axis(4) > .15 or self.gamepad.get_axis(4) < -.15:
|
||||
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
|
||||
input.axis3 = -1 * round(self.gamepad.get_axis(4))
|
||||
|
||||
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
|
||||
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
|
||||
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
|
||||
|
||||
|
||||
#Button Mappings
|
||||
#axis2 -> LT
|
||||
#axis5 -> RT
|
||||
#Buttons0 -> A
|
||||
#Buttons1 -> B
|
||||
#Buttons2 -> X
|
||||
#Buttons3 -> Y
|
||||
#Buttons4 -> LB
|
||||
#Buttons5 -> RB
|
||||
#Buttons6 -> Back
|
||||
#Buttons7 -> Start
|
||||
# Button Mappings
|
||||
# axis2 -> LT
|
||||
# axis5 -> RT
|
||||
# Buttons0 -> A
|
||||
# Buttons1 -> B
|
||||
# Buttons2 -> X
|
||||
# Buttons3 -> Y
|
||||
# Buttons4 -> LB
|
||||
# Buttons5 -> RB
|
||||
# Buttons6 -> Back
|
||||
# Buttons7 -> Start
|
||||
|
||||
input.linear_actuator = 0
|
||||
|
||||
|
||||
if pygame.joystick.get_count() != 0:
|
||||
|
||||
self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n")
|
||||
self.get_logger().info(
|
||||
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
|
||||
)
|
||||
self.manual_pub.publish(input)
|
||||
else:
|
||||
pass
|
||||
@@ -241,14 +239,12 @@ class Headless(Node):
|
||||
# else:
|
||||
# input.y = False
|
||||
|
||||
|
||||
# dpad_input = self.gamepad.get_hat(0)#D-pad input
|
||||
|
||||
# #not using up/down on DPad
|
||||
# input.d_up = False
|
||||
# input.d_down = False
|
||||
|
||||
|
||||
# if(dpad_input[0] == 1):#D-pad right
|
||||
# input.d_right = True
|
||||
# else:
|
||||
@@ -258,7 +254,6 @@ class Headless(Node):
|
||||
# else:
|
||||
# input.d_left = False
|
||||
|
||||
|
||||
# if pygame.joystick.get_count() != 0:
|
||||
|
||||
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
|
||||
@@ -268,9 +263,6 @@ class Headless(Node):
|
||||
# pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
||||
@@ -279,9 +271,9 @@ def main(args=None):
|
||||
rclpy.spin(node)
|
||||
rclpy.shutdown()
|
||||
|
||||
#tb_bs = BaseStation()
|
||||
#node.run()
|
||||
# tb_bs = BaseStation()
|
||||
# node.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy import qos
|
||||
import serial
|
||||
import sys
|
||||
import threading
|
||||
@@ -10,22 +9,12 @@ import atexit
|
||||
import signal
|
||||
from std_msgs.msg import String
|
||||
from ros2_interfaces_pkg.msg import ArmManual
|
||||
from ros2_interfaces_pkg.msg import ArmIK
|
||||
from ros2_interfaces_pkg.msg import SocketFeedback
|
||||
from ros2_interfaces_pkg.msg import DigitFeedback
|
||||
from sensor_msgs.msg import JointState
|
||||
from tf2_ros import TransformBroadcaster, TransformStamped
|
||||
import math
|
||||
|
||||
|
||||
# IK-Related imports
|
||||
import numpy as np
|
||||
import time, math, os
|
||||
from math import sin, cos, pi
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
from . import astra_arm
|
||||
|
||||
# control_qos = qos.QoSProfile(
|
||||
# history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
# depth=1,
|
||||
@@ -47,57 +36,71 @@ class SerialRelay(Node):
|
||||
super().__init__("arm_node")
|
||||
|
||||
# Get launch mode parameter
|
||||
self.declare_parameter('launch_mode', 'arm')
|
||||
self.launch_mode = self.get_parameter('launch_mode').value
|
||||
self.declare_parameter("launch_mode", "arm")
|
||||
self.launch_mode = self.get_parameter("launch_mode").value
|
||||
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
|
||||
|
||||
# Create publishers
|
||||
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
|
||||
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
|
||||
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
|
||||
self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
|
||||
self.socket_pub = self.create_publisher(
|
||||
SocketFeedback, "/arm/feedback/socket", 10
|
||||
)
|
||||
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||
|
||||
# Create subscribers
|
||||
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
|
||||
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
|
||||
|
||||
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
|
||||
self.man_sub = self.create_subscription(
|
||||
ArmManual, "/arm/control/manual", self.send_manual, 10
|
||||
)
|
||||
|
||||
# New messages
|
||||
self.joint_state_pub = self.create_publisher(JointState, 'joint_states', 10)
|
||||
self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
|
||||
self.tf_broadcaster = TransformBroadcaster(self)
|
||||
self.joint_state = JointState()
|
||||
self.joint_state.name = ['Axis_0_Joint', 'Axis_1_Joint', 'Axis_2_Joint', 'Axis_3_Joint', 'Wrist_Differential_Joint', 'Wrist-EF_Roll_Joint', "Gripper_Slider_Left"]
|
||||
self.joint_state.position = [0.0] * len(self.joint_state.name) # Initialize with zeros
|
||||
self.joint_state.name = [
|
||||
"Axis_0_Joint",
|
||||
"Axis_1_Joint",
|
||||
"Axis_2_Joint",
|
||||
"Axis_3_Joint",
|
||||
"Wrist_Differential_Joint",
|
||||
"Wrist-EF_Roll_Joint",
|
||||
"Gripper_Slider_Left",
|
||||
]
|
||||
self.joint_state.position = [0.0] * len(
|
||||
self.joint_state.name
|
||||
) # Initialize with zeros
|
||||
self.odom_trans = TransformStamped()
|
||||
self.odom_trans.header.frame_id = 'odom'
|
||||
self.odom_trans.child_frame_id = 'base_link'
|
||||
self.odom_trans.header.frame_id = "odom"
|
||||
self.odom_trans.child_frame_id = "base_link"
|
||||
self.odom_trans.transform.translation.x = 0.0
|
||||
self.odom_trans.transform.translation.y = 0.0
|
||||
self.odom_trans.transform.translation.z = 0.0
|
||||
|
||||
self.joint_command_sub = self.create_subscription(JointState, '/joint_commands', self.joint_command_callback, 10)
|
||||
self.joint_command_sub = self.create_subscription(
|
||||
JointState, "/joint_commands", self.joint_command_callback, 10
|
||||
)
|
||||
|
||||
# Topics used in anchor mode
|
||||
if self.launch_mode == 'anchor':
|
||||
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
|
||||
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
|
||||
if self.launch_mode == "anchor":
|
||||
self.anchor_sub = self.create_subscription(
|
||||
String, "/anchor/arm/feedback", self.anchor_feedback, 10
|
||||
)
|
||||
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
|
||||
|
||||
self.arm = astra_arm.Arm('arm12.urdf')
|
||||
self.arm_feedback = SocketFeedback()
|
||||
self.digit_feedback = DigitFeedback()
|
||||
|
||||
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode
|
||||
if self.launch_mode == 'arm':
|
||||
if self.launch_mode == "arm":
|
||||
# Loop through all serial devices on the computer to check for the MCU
|
||||
self.port = None
|
||||
ports = SerialRelay.list_serial_ports()
|
||||
for i in range(4):
|
||||
for _ in range(4):
|
||||
for port in ports:
|
||||
try:
|
||||
# connect and send a ping command
|
||||
ser = serial.Serial(port, 115200, timeout=1)
|
||||
#print(f"Checking port {port}...")
|
||||
# print(f"Checking port {port}...")
|
||||
ser.write(b"ping\n")
|
||||
response = ser.read_until("\n") # type: ignore
|
||||
|
||||
@@ -112,7 +115,9 @@ class SerialRelay(Node):
|
||||
break
|
||||
|
||||
if self.port is None:
|
||||
self.get_logger().info("Unable to find MCU... please make sure it is connected.")
|
||||
self.get_logger().info(
|
||||
"Unable to find MCU... please make sure it is connected."
|
||||
)
|
||||
time.sleep(1)
|
||||
sys.exit(1)
|
||||
|
||||
@@ -124,11 +129,11 @@ class SerialRelay(Node):
|
||||
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
#if in arm mode, will need to read from the MCU
|
||||
# if in arm mode, will need to read from the MCU
|
||||
|
||||
try:
|
||||
while rclpy.ok():
|
||||
if self.launch_mode == 'arm':
|
||||
if self.launch_mode == "arm":
|
||||
if self.ser.in_waiting:
|
||||
self.read_mcu()
|
||||
else:
|
||||
@@ -138,13 +143,12 @@ class SerialRelay(Node):
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
|
||||
#Currently will just spit out all values over the /arm/feedback/debug topic as strings
|
||||
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
|
||||
def read_mcu(self):
|
||||
try:
|
||||
output = str(self.ser.readline(), "utf8")
|
||||
if output:
|
||||
#self.get_logger().info(f"[MCU] {output}")
|
||||
# self.get_logger().info(f"[MCU] {output}")
|
||||
msg = String()
|
||||
msg.data = output
|
||||
self.debug_pub.publish(msg)
|
||||
@@ -166,83 +170,9 @@ class SerialRelay(Node):
|
||||
self.ser.close()
|
||||
pass
|
||||
|
||||
def send_ik(self, msg):
|
||||
# Convert Vector3 to a NumPy array
|
||||
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
|
||||
# decrease input vector by 90%
|
||||
input_raw = input_raw * 0.2
|
||||
|
||||
if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0:
|
||||
self.get_logger().info("No input, stopping arm.")
|
||||
command = "can_relay_tovic,arm,39,0,0,0,0\n"
|
||||
self.send_cmd(command)
|
||||
return
|
||||
|
||||
# Debug output
|
||||
tempMsg = String()
|
||||
tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
|
||||
#self.debug_pub.publish(tempMsg)
|
||||
|
||||
# Target position is current position + input vector
|
||||
current_position = self.arm.get_position_vector()
|
||||
target_position = current_position + input_raw
|
||||
|
||||
|
||||
#Print for IK DEBUG
|
||||
tempMsg = String()
|
||||
# tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
|
||||
tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
|
||||
self.ik_debug.publish(tempMsg)
|
||||
self.get_logger().info(f"[IK] {tempMsg.data}")
|
||||
|
||||
# Debug output for current position
|
||||
#tempMsg.data = "Current Position: " + str(current_position)
|
||||
#self.debug_pub.publish(tempMsg)
|
||||
|
||||
# Debug output for target position
|
||||
#tempMsg.data = "Target Position: " + str(target_position)
|
||||
#self.debug_pub.publish(tempMsg)
|
||||
|
||||
# Perform IK with the target position
|
||||
if self.arm.perform_ik(target_position, self.get_logger()):
|
||||
# Send command to control
|
||||
|
||||
#command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
|
||||
#self.send_cmd(command)
|
||||
self.get_logger().info(f"IK Success: {target_position}")
|
||||
self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
|
||||
|
||||
# tempMsg = String()
|
||||
# tempMsg.data = "IK Success: " + str(target_position)
|
||||
# #self.debug_pub.publish(tempMsg)
|
||||
# tempMsg.data = "Sending: " + str(command)
|
||||
#self.debug_pub.publish(tempMsg)
|
||||
|
||||
# Send the IK angles to the MCU
|
||||
command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
|
||||
# self.send_cmd(command)
|
||||
|
||||
# Manual control for Wrist/Effector
|
||||
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
|
||||
|
||||
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
|
||||
|
||||
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
|
||||
|
||||
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
|
||||
|
||||
self.send_cmd(command)
|
||||
else:
|
||||
self.get_logger().info("IK Fail")
|
||||
self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
|
||||
# tempMsg = String()
|
||||
# tempMsg.data = "IK Fail"
|
||||
#self.debug_pub.publish(tempMsg)
|
||||
|
||||
|
||||
def joint_command_callback(self, msg: JointState):
|
||||
# Embedded takes deg*10, ROS2 uses Radians
|
||||
positions = [math.degrees(pos)*10 for pos in msg.position]
|
||||
positions = [math.degrees(pos) * 10 for pos in msg.position]
|
||||
# Axis 2 & 3 URDF direction is inverted
|
||||
positions[2] = -positions[2]
|
||||
positions[3] = -positions[3]
|
||||
@@ -256,22 +186,37 @@ class SerialRelay(Node):
|
||||
# Gripper IK does not have adequate hardware yet
|
||||
self.send_cmd(command)
|
||||
|
||||
|
||||
def send_manual(self, msg: ArmManual):
|
||||
axis0 = msg.axis0
|
||||
axis1 = -1 * msg.axis1
|
||||
axis2 = msg.axis2
|
||||
axis3 = msg.axis3
|
||||
|
||||
#Send controls for arm
|
||||
# Send controls for arm
|
||||
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
|
||||
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
|
||||
command += (
|
||||
"can_relay_tovic,arm,39,"
|
||||
+ str(axis0)
|
||||
+ ","
|
||||
+ str(axis1)
|
||||
+ ","
|
||||
+ str(axis2)
|
||||
+ ","
|
||||
+ str(axis3)
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
#Send controls for end effector
|
||||
# Send controls for end effector
|
||||
|
||||
# command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
|
||||
# command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
|
||||
command += "can_relay_tovic,digit,39," + str(msg.effector_yaw) + "," + str(msg.effector_roll) + "\n"
|
||||
command += (
|
||||
"can_relay_tovic,digit,39,"
|
||||
+ str(msg.effector_yaw)
|
||||
+ ","
|
||||
+ str(msg.effector_roll)
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
|
||||
|
||||
@@ -284,21 +229,23 @@ class SerialRelay(Node):
|
||||
return
|
||||
|
||||
def send_cmd(self, msg: str):
|
||||
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
|
||||
if (
|
||||
self.launch_mode == "anchor"
|
||||
): # if in anchor mode, send to anchor node to relay
|
||||
output = String()
|
||||
output.data = msg
|
||||
self.anchor_pub.publish(output)
|
||||
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
|
||||
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
|
||||
self.get_logger().info(f"[Arm to MCU] {msg}")
|
||||
self.ser.write(bytes(msg, "utf8"))
|
||||
|
||||
def anchor_feedback(self, msg: String):
|
||||
output = msg.data
|
||||
if output.startswith("can_relay_fromvic,arm,55"):
|
||||
#pass
|
||||
# pass
|
||||
self.updateAngleFeedback(output)
|
||||
elif output.startswith("can_relay_fromvic,arm,54"):
|
||||
#pass
|
||||
# pass
|
||||
self.updateBusVoltage(output)
|
||||
elif output.startswith("can_relay_fromvic,arm,53"):
|
||||
self.updateMotorFeedback(output)
|
||||
@@ -335,12 +282,12 @@ class SerialRelay(Node):
|
||||
angles = [float(angle) / 10.0 for angle in angles_in]
|
||||
#
|
||||
#
|
||||
#THIS NEEDS TO BE REMOVED LATER
|
||||
#PLACEHOLDER FOR WRIST VALUE
|
||||
# THIS NEEDS TO BE REMOVED LATER
|
||||
# PLACEHOLDER FOR WRIST VALUE
|
||||
#
|
||||
#
|
||||
angles.append(0.0)#placeholder for wrist_continuous
|
||||
angles.append(0.0)#placeholder for wrist
|
||||
angles.append(0.0) # placeholder for wrist_continuous
|
||||
angles.append(0.0) # placeholder for wrist
|
||||
#
|
||||
#
|
||||
# # Update the arm's current angles
|
||||
@@ -402,11 +349,10 @@ class SerialRelay(Node):
|
||||
self.arm_feedback.axis0_voltage = voltage
|
||||
self.arm_feedback.axis0_current = current
|
||||
|
||||
|
||||
@staticmethod
|
||||
def list_serial_ports():
|
||||
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
|
||||
#return glob.glob("/dev/tty[A-Za-z]*")
|
||||
# return glob.glob("/dev/tty[A-Za-z]*")
|
||||
|
||||
def cleanup(self):
|
||||
print("Cleaning up...")
|
||||
@@ -416,11 +362,13 @@ class SerialRelay(Node):
|
||||
except Exception as e:
|
||||
exit(0)
|
||||
|
||||
|
||||
def myexcepthook(type, value, tb):
|
||||
print("Uncaught exception:", type, value)
|
||||
if serial_pub:
|
||||
serial_pub.cleanup()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
sys.excepthook = myexcepthook
|
||||
@@ -429,7 +377,10 @@ def main(args=None):
|
||||
serial_pub = SerialRelay()
|
||||
serial_pub.run()
|
||||
|
||||
if __name__ == '__main__':
|
||||
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
|
||||
|
||||
if __name__ == "__main__":
|
||||
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
signal.signal(
|
||||
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
||||
) # Catch termination signals and exit cleanly
|
||||
main()
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
import numpy as np
|
||||
import time, math, os
|
||||
from math import sin, cos, pi
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from ikpy.chain import Chain
|
||||
from ikpy.link import OriginLink, URDFLink
|
||||
#import pygame as pyg
|
||||
from scipy.spatial.transform import Rotation as R
|
||||
from geometry_msgs.msg import Vector3
|
||||
|
||||
|
||||
# Misc
|
||||
degree = pi / 180.0
|
||||
|
||||
|
||||
def convert_angles(angles):
|
||||
# Converts angles to the format used for the urdf (contains some dummy joints)
|
||||
return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
|
||||
|
||||
|
||||
class Arm:
|
||||
def __init__(self, urdf_name):
|
||||
self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
|
||||
# URDF file path
|
||||
self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
|
||||
# IKpy Chain
|
||||
self.chain = Chain.from_urdf_file(self.urdf)
|
||||
|
||||
|
||||
# Arrays for joint states
|
||||
# Some links in the URDF are static (non-joints), these will remain zero for IK
|
||||
# Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector
|
||||
self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
self.current_angles = self.zero_angles
|
||||
self.last_angles = self.zero_angles
|
||||
self.ik_angles = self.zero_angles
|
||||
|
||||
self.current_position: list[float] = []
|
||||
self.target_position = [0.0, 0.0, 0.0]
|
||||
self.target_orientation: list = [] # Effector orientation desired at target position.
|
||||
# Generally orientation for the effector is modified manually by the operator.
|
||||
|
||||
# Might not need, copied over from state_publisher.py in ik_test
|
||||
#self.step = 0.03 # Max movement increment
|
||||
|
||||
|
||||
def perform_ik(self, target_position, logger):
|
||||
self.target_position = target_position
|
||||
# Update the target orientation to the current orientation
|
||||
self.update_orientation()
|
||||
# print(f"[IK FOR] Target Position: {self.target_position}")
|
||||
try:
|
||||
# print(f"[TRY] Current Angles: {self.current_angles}")
|
||||
# print(f"[TRY] Target Position: {self.target_position}")
|
||||
# print(f"[TRY] Target Orientation: {self.target_orientation}")
|
||||
self.ik_angles = self.chain.inverse_kinematics(
|
||||
target_position=self.target_position,
|
||||
target_orientation=self.target_orientation,
|
||||
initial_position=self.current_angles,
|
||||
orientation_mode="all"
|
||||
)
|
||||
# Check if the solution is within the tolerance
|
||||
fk_matrix = self.chain.forward_kinematics(self.ik_angles)
|
||||
|
||||
fk_position = fk_matrix[:3, 3] # type: ignore
|
||||
|
||||
# print(f"[TRY] FK Position for Solution: {fk_position}")
|
||||
|
||||
error = np.linalg.norm(target_position - fk_position)
|
||||
if error > self.ik_tolerance:
|
||||
logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
|
||||
return False
|
||||
else:
|
||||
logger.info(f"IK Solution Found. Error: {error}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.info(f"IK failed for exception: {e}")
|
||||
return False
|
||||
|
||||
# # Given the FK_Matix for the arm's current pose, update the orientation array
|
||||
# def update_orientation(self, fk_matrix):
|
||||
# self.target_orientation = fk_matrix[:3, :3]
|
||||
# return
|
||||
|
||||
# def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist):
|
||||
# self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0]
|
||||
# return
|
||||
|
||||
# Get current orientation of the end effector and update target_orientation
|
||||
def update_orientation(self):
|
||||
|
||||
# FK matrix for arm's current pose
|
||||
fk_matrix = self.chain.forward_kinematics(self.current_angles)
|
||||
|
||||
# Update target_orientation to the effector's current orientation
|
||||
self.target_orientation = fk_matrix[:3, :3] # type: ignore
|
||||
|
||||
# Update current angles to those provided
|
||||
# Resetting last_angles to the new angles
|
||||
#
|
||||
# Use: First call, or when angles are changed manually.
|
||||
def reset_angles(self, angles):
|
||||
# Update angles to the new angles
|
||||
self.current_angles = convert_angles(angles)
|
||||
self.last_angles = self.current_angles
|
||||
|
||||
# Update current angles to those provided
|
||||
# Maintain previous angles in last_angles
|
||||
#
|
||||
# Use: Repeated calls during IK operation
|
||||
def update_angles(self, angles):
|
||||
# Update angles to the new angles
|
||||
self.last_angles = self.current_angles
|
||||
self.current_angles = convert_angles(angles)
|
||||
|
||||
# Get current X,Y,Z position of end effector
|
||||
def get_position(self):
|
||||
# FK matrix for arm's current pose
|
||||
fk_matrix = self.chain.forward_kinematics(self.current_angles)
|
||||
|
||||
# Get the position of the end effector from the FK matrix
|
||||
position = fk_matrix[:3, 3] # type: ignore
|
||||
|
||||
return position
|
||||
|
||||
# Get current X,Y,Z position of end effector
|
||||
def get_position_vector(self):
|
||||
# FK matrix for arm's current pose
|
||||
fk_matrix = self.chain.forward_kinematics(self.current_angles)
|
||||
|
||||
# Get the position of the end effector from the FK matrix
|
||||
position = fk_matrix[:3, 3] # type: ignore
|
||||
|
||||
# Return position as a NumPy array
|
||||
return np.array(position)
|
||||
|
||||
|
||||
def update_position(self):
|
||||
# FK matrix for arm's current pose
|
||||
fk_matrix = self.chain.forward_kinematics(self.current_angles)
|
||||
|
||||
# Get the position of the end effector from the FK matrix and update current pos
|
||||
self.current_position = fk_matrix[:3, 3] # type: ignore
|
||||
Reference in New Issue
Block a user