diff --git a/src/arm_pkg/arm_pkg/arm_headless.py b/src/arm_pkg/arm_pkg/arm_headless.py index bdce40b..4078e16 100755 --- a/src/arm_pkg/arm_pkg/arm_headless.py +++ b/src/arm_pkg/arm_pkg/arm_headless.py @@ -17,9 +17,12 @@ from ros2_interfaces_pkg.msg import ArmManual from ros2_interfaces_pkg.msg import ArmIK -os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() +os.environ["SDL_AUDIODRIVER"] = ( + "dummy" # Force pygame to use a dummy audio driver before pygame.init() +) os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display + class Headless(Node): def __init__(self): # Initalize node with name @@ -30,20 +33,18 @@ class Headless(Node): self.create_timer(0.1, self.send_manual) - # Create a publisher to publish any output the pico sends # Depricated, kept temporarily for reference - #self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) - self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10) + # self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) + self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10) # Create a subscriber to listen to any commands sent for the pico - + # Depricated, kept temporarily for reference - #self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) - - #self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) + # self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) + # self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) self.laser_status = 0 @@ -66,38 +67,36 @@ class Headless(Node): # Initialize the first gamepad, print name to terminal self.gamepad = pygame.joystick.Joystick(0) self.gamepad.init() - print(f'Gamepad Found: {self.gamepad.get_name()}') + print(f"Gamepad Found: {self.gamepad.get_name()}") # # - def run(self): # This thread makes all the update processes run in the background thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread.start() - try: while rclpy.ok(): - #Check the pico for updates + # Check the pico for updates - #self.read_feedback() - if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected + # self.read_feedback() + if ( + pygame.joystick.get_count() == 0 + ): # if controller disconnected, wait for it to be reconnected print(f"Gamepad disconnected: {self.gamepad.get_name()}") - + while pygame.joystick.get_count() == 0: - #self.send_controls() #depricated, kept for reference temporarily + # self.send_controls() #depricated, kept for reference temporarily self.send_manual() - #self.read_feedback() + # self.read_feedback() self.gamepad = pygame.joystick.Joystick(0) - self.gamepad.init() #re-initialized gamepad + self.gamepad.init() # re-initialized gamepad print(f"Gamepad reconnected: {self.gamepad.get_name()}") - except KeyboardInterrupt: sys.exit(0) - def send_manual(self): for event in pygame.event.get(): if event.type == pygame.QUIT: @@ -105,21 +104,20 @@ class Headless(Node): exit() input = ArmManual() - # Triggers for gripper control - if self.gamepad.get_axis(2) > 0:#left trigger + if self.gamepad.get_axis(2) > 0: # left trigger input.gripper = -1 - elif self.gamepad.get_axis(5) > 0:#right trigger + elif self.gamepad.get_axis(5) > 0: # right trigger input.gripper = 1 # Toggle Laser - if self.gamepad.get_button(7):#Start + if self.gamepad.get_button(7): # Start self.laser_status = 1 - elif self.gamepad.get_button(6):#Back + elif self.gamepad.get_button(6): # Back self.laser_status = 0 input.laser = self.laser_status - if self.gamepad.get_button(5):#right bumper, control effector + if self.gamepad.get_button(5): # right bumper, control effector # Left stick X-axis for effector yaw if self.gamepad.get_axis(0) > 0: @@ -133,7 +131,7 @@ class Headless(Node): elif self.gamepad.get_axis(3) < 0: input.effector_roll = -1 - else: # Control arm axis + else: # Control arm axis dpad_input = self.gamepad.get_hat(0) input.axis0 = 0 if dpad_input[0] == 1: @@ -141,44 +139,44 @@ class Headless(Node): elif dpad_input[0] == -1: input.axis0 = -1 - if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15: + if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15: input.axis1 = round(self.gamepad.get_axis(0)) - if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15: + if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15: input.axis2 = -1 * round(self.gamepad.get_axis(1)) - if self.gamepad.get_axis(4) > .15 or self.gamepad.get_axis(4) < -.15: + if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15: input.axis3 = -1 * round(self.gamepad.get_axis(4)) # input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis # input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis # input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis - - #Button Mappings - #axis2 -> LT - #axis5 -> RT - #Buttons0 -> A - #Buttons1 -> B - #Buttons2 -> X - #Buttons3 -> Y - #Buttons4 -> LB - #Buttons5 -> RB - #Buttons6 -> Back - #Buttons7 -> Start + # Button Mappings + # axis2 -> LT + # axis5 -> RT + # Buttons0 -> A + # Buttons1 -> B + # Buttons2 -> X + # Buttons3 -> Y + # Buttons4 -> LB + # Buttons5 -> RB + # Buttons6 -> Back + # Buttons7 -> Start input.linear_actuator = 0 - if pygame.joystick.get_count() != 0: - - self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n") + + self.get_logger().info( + f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n" + ) self.manual_pub.publish(input) else: pass pass - + # Depricated, kept temporarily for reference # def send_controls(self): @@ -190,7 +188,7 @@ class Headless(Node): # input.lt = self.gamepad.get_axis(2)#left trigger # input.rt = self.gamepad.get_axis(5)#right trigger - + # #input.lb = self.gamepad.get_button(9)#Value must be converted to bool # if(self.gamepad.get_button(4)):#left bumper # input.lb = True @@ -202,7 +200,7 @@ class Headless(Node): # input.rb = True # else: # input.rb = False - + # #input.plus = self.gamepad.get_button(6)#plus button # if(self.gamepad.get_button(7)):#plus button # input.plus = True @@ -218,7 +216,7 @@ class Headless(Node): # input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis # input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis # input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis - # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis + # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis # #input.a = self.gamepad.get_button(1)#A button # if(self.gamepad.get_button(0)):#A button @@ -241,14 +239,12 @@ class Headless(Node): # else: # input.y = False - # dpad_input = self.gamepad.get_hat(0)#D-pad input # #not using up/down on DPad # input.d_up = False # input.d_down = False - # if(dpad_input[0] == 1):#D-pad right # input.d_right = True # else: @@ -257,10 +253,9 @@ class Headless(Node): # input.d_left = True # else: # input.d_left = False - # if pygame.joystick.get_count() != 0: - + # self.get_logger().info(f"[Ctrl] Updated Controller State\n") # self.publisher.publish(input) @@ -268,20 +263,17 @@ class Headless(Node): # pass - - - def main(args=None): rclpy.init(args=args) node = Headless() - + rclpy.spin(node) rclpy.shutdown() - #tb_bs = BaseStation() - #node.run() + # tb_bs = BaseStation() + # node.run() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index f3941eb..ea63d4d 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -1,6 +1,5 @@ import rclpy from rclpy.node import Node -from rclpy import qos import serial import sys import threading @@ -10,22 +9,12 @@ import atexit import signal from std_msgs.msg import String from ros2_interfaces_pkg.msg import ArmManual -from ros2_interfaces_pkg.msg import ArmIK from ros2_interfaces_pkg.msg import SocketFeedback from ros2_interfaces_pkg.msg import DigitFeedback from sensor_msgs.msg import JointState from tf2_ros import TransformBroadcaster, TransformStamped import math - -# IK-Related imports -import numpy as np -import time, math, os -from math import sin, cos, pi -from ament_index_python.packages import get_package_share_directory - -from . import astra_arm - # control_qos = qos.QoSProfile( # history=qos.QoSHistoryPolicy.KEEP_LAST, # depth=1, @@ -47,57 +36,71 @@ class SerialRelay(Node): super().__init__("arm_node") # Get launch mode parameter - self.declare_parameter('launch_mode', 'arm') - self.launch_mode = self.get_parameter('launch_mode').value + self.declare_parameter("launch_mode", "arm") + self.launch_mode = self.get_parameter("launch_mode").value self.get_logger().info(f"arm launch_mode is: {self.launch_mode}") # Create publishers - self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) - self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) - self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10) + self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10) + self.socket_pub = self.create_publisher( + SocketFeedback, "/arm/feedback/socket", 10 + ) + self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) self.feedback_timer = self.create_timer(0.25, self.publish_feedback) # Create subscribers - self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) - self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) - - self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10) + self.man_sub = self.create_subscription( + ArmManual, "/arm/control/manual", self.send_manual, 10 + ) # New messages - self.joint_state_pub = self.create_publisher(JointState, 'joint_states', 10) + self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10) self.tf_broadcaster = TransformBroadcaster(self) self.joint_state = JointState() - self.joint_state.name = ['Axis_0_Joint', 'Axis_1_Joint', 'Axis_2_Joint', 'Axis_3_Joint', 'Wrist_Differential_Joint', 'Wrist-EF_Roll_Joint', "Gripper_Slider_Left"] - self.joint_state.position = [0.0] * len(self.joint_state.name) # Initialize with zeros + self.joint_state.name = [ + "Axis_0_Joint", + "Axis_1_Joint", + "Axis_2_Joint", + "Axis_3_Joint", + "Wrist_Differential_Joint", + "Wrist-EF_Roll_Joint", + "Gripper_Slider_Left", + ] + self.joint_state.position = [0.0] * len( + self.joint_state.name + ) # Initialize with zeros self.odom_trans = TransformStamped() - self.odom_trans.header.frame_id = 'odom' - self.odom_trans.child_frame_id = 'base_link' + self.odom_trans.header.frame_id = "odom" + self.odom_trans.child_frame_id = "base_link" self.odom_trans.transform.translation.x = 0.0 self.odom_trans.transform.translation.y = 0.0 self.odom_trans.transform.translation.z = 0.0 - self.joint_command_sub = self.create_subscription(JointState, '/joint_commands', self.joint_command_callback, 10) + self.joint_command_sub = self.create_subscription( + JointState, "/joint_commands", self.joint_command_callback, 10 + ) # Topics used in anchor mode - if self.launch_mode == 'anchor': - self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10) - self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) + if self.launch_mode == "anchor": + self.anchor_sub = self.create_subscription( + String, "/anchor/arm/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) - self.arm = astra_arm.Arm('arm12.urdf') self.arm_feedback = SocketFeedback() self.digit_feedback = DigitFeedback() # Search for ports IF in 'arm' (standalone) and not 'anchor' mode - if self.launch_mode == 'arm': + if self.launch_mode == "arm": # Loop through all serial devices on the computer to check for the MCU self.port = None ports = SerialRelay.list_serial_ports() - for i in range(4): + for _ in range(4): for port in ports: try: # connect and send a ping command ser = serial.Serial(port, 115200, timeout=1) - #print(f"Checking port {port}...") + # print(f"Checking port {port}...") ser.write(b"ping\n") response = ser.read_until("\n") # type: ignore @@ -110,12 +113,14 @@ class SerialRelay(Node): pass if self.port is not None: break - + if self.port is None: - self.get_logger().info("Unable to find MCU... please make sure it is connected.") + self.get_logger().info( + "Unable to find MCU... please make sure it is connected." + ) time.sleep(1) sys.exit(1) - + self.ser = serial.Serial(self.port, 115200) atexit.register(self.cleanup) @@ -123,12 +128,12 @@ class SerialRelay(Node): global thread thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread.start() - - #if in arm mode, will need to read from the MCU + + # if in arm mode, will need to read from the MCU try: while rclpy.ok(): - if self.launch_mode == 'arm': + if self.launch_mode == "arm": if self.ser.in_waiting: self.read_mcu() else: @@ -138,13 +143,12 @@ class SerialRelay(Node): finally: self.cleanup() - - #Currently will just spit out all values over the /arm/feedback/debug topic as strings + # Currently will just spit out all values over the /arm/feedback/debug topic as strings def read_mcu(self): try: output = str(self.ser.readline(), "utf8") if output: - #self.get_logger().info(f"[MCU] {output}") + # self.get_logger().info(f"[MCU] {output}") msg = String() msg.data = output self.debug_pub.publish(msg) @@ -166,83 +170,9 @@ class SerialRelay(Node): self.ser.close() pass - def send_ik(self, msg): - # Convert Vector3 to a NumPy array - input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array - # decrease input vector by 90% - input_raw = input_raw * 0.2 - - if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0: - self.get_logger().info("No input, stopping arm.") - command = "can_relay_tovic,arm,39,0,0,0,0\n" - self.send_cmd(command) - return - - # Debug output - tempMsg = String() - tempMsg.data = "From IK Control Got Vector: " + str(input_raw) - #self.debug_pub.publish(tempMsg) - - # Target position is current position + input vector - current_position = self.arm.get_position_vector() - target_position = current_position + input_raw - - - #Print for IK DEBUG - tempMsg = String() - # tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles) - tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6])) - self.ik_debug.publish(tempMsg) - self.get_logger().info(f"[IK] {tempMsg.data}") - - # Debug output for current position - #tempMsg.data = "Current Position: " + str(current_position) - #self.debug_pub.publish(tempMsg) - - # Debug output for target position - #tempMsg.data = "Target Position: " + str(target_position) - #self.debug_pub.publish(tempMsg) - - # Perform IK with the target position - if self.arm.perform_ik(target_position, self.get_logger()): - # Send command to control - - #command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n" - #self.send_cmd(command) - self.get_logger().info(f"IK Success: {target_position}") - self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]") - - # tempMsg = String() - # tempMsg.data = "IK Success: " + str(target_position) - # #self.debug_pub.publish(tempMsg) - # tempMsg.data = "Sending: " + str(command) - #self.debug_pub.publish(tempMsg) - - # Send the IK angles to the MCU - command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n" - # self.send_cmd(command) - - # Manual control for Wrist/Effector - command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n" - - command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n" - - command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n" - - command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" - - self.send_cmd(command) - else: - self.get_logger().info("IK Fail") - self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]") - # tempMsg = String() - # tempMsg.data = "IK Fail" - #self.debug_pub.publish(tempMsg) - - def joint_command_callback(self, msg: JointState): # Embedded takes deg*10, ROS2 uses Radians - positions = [math.degrees(pos)*10 for pos in msg.position] + positions = [math.degrees(pos) * 10 for pos in msg.position] # Axis 2 & 3 URDF direction is inverted positions[2] = -positions[2] positions[3] = -positions[3] @@ -256,22 +186,37 @@ class SerialRelay(Node): # Gripper IK does not have adequate hardware yet self.send_cmd(command) - def send_manual(self, msg: ArmManual): axis0 = msg.axis0 axis1 = -1 * msg.axis1 axis2 = msg.axis2 axis3 = msg.axis3 - #Send controls for arm + # Send controls for arm command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n" - command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" - - #Send controls for end effector + command += ( + "can_relay_tovic,arm,39," + + str(axis0) + + "," + + str(axis1) + + "," + + str(axis2) + + "," + + str(axis3) + + "\n" + ) + + # Send controls for end effector # command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n" # command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n" - command += "can_relay_tovic,digit,39," + str(msg.effector_yaw) + "," + str(msg.effector_roll) + "\n" + command += ( + "can_relay_tovic,digit,39," + + str(msg.effector_yaw) + + "," + + str(msg.effector_roll) + + "\n" + ) command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n" @@ -282,23 +227,25 @@ class SerialRelay(Node): self.send_cmd(command) return - + def send_cmd(self, msg: str): - if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay + if ( + self.launch_mode == "anchor" + ): # if in anchor mode, send to anchor node to relay output = String() output.data = msg self.anchor_pub.publish(output) - elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly + elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly self.get_logger().info(f"[Arm to MCU] {msg}") self.ser.write(bytes(msg, "utf8")) def anchor_feedback(self, msg: String): output = msg.data if output.startswith("can_relay_fromvic,arm,55"): - #pass + # pass self.updateAngleFeedback(output) elif output.startswith("can_relay_fromvic,arm,54"): - #pass + # pass self.updateBusVoltage(output) elif output.startswith("can_relay_fromvic,arm,53"): self.updateMotorFeedback(output) @@ -327,7 +274,7 @@ class SerialRelay(Node): # Angle feedbacks, # split the msg.data by commas parts = msg.split(",") - + if len(parts) >= 7: # Extract the angles from the string angles_in = parts[3:7] @@ -335,12 +282,12 @@ class SerialRelay(Node): angles = [float(angle) / 10.0 for angle in angles_in] # # - #THIS NEEDS TO BE REMOVED LATER - #PLACEHOLDER FOR WRIST VALUE + # THIS NEEDS TO BE REMOVED LATER + # PLACEHOLDER FOR WRIST VALUE # # - angles.append(0.0)#placeholder for wrist_continuous - angles.append(0.0)#placeholder for wrist + angles.append(0.0) # placeholder for wrist_continuous + angles.append(0.0) # placeholder for wrist # # # # Update the arm's current angles @@ -378,7 +325,7 @@ class SerialRelay(Node): self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0 else: self.get_logger().info("Invalid voltage feedback input format") - + def updateMotorFeedback(self, msg: str): parts = str(msg.strip()).split(",") motorId = round(float(parts[3])) @@ -402,11 +349,10 @@ class SerialRelay(Node): self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_current = current - @staticmethod def list_serial_ports(): return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - #return glob.glob("/dev/tty[A-Za-z]*") + # return glob.glob("/dev/tty[A-Za-z]*") def cleanup(self): print("Cleaning up...") @@ -416,11 +362,13 @@ class SerialRelay(Node): except Exception as e: exit(0) + def myexcepthook(type, value, tb): print("Uncaught exception:", type, value) if serial_pub: serial_pub.cleanup() + def main(args=None): rclpy.init(args=args) sys.excepthook = myexcepthook @@ -429,7 +377,10 @@ def main(args=None): serial_pub = SerialRelay() serial_pub.run() -if __name__ == '__main__': - #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly + +if __name__ == "__main__": + # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly main() diff --git a/src/arm_pkg/arm_pkg/astra_arm.py b/src/arm_pkg/arm_pkg/astra_arm.py deleted file mode 100644 index 44a1373..0000000 --- a/src/arm_pkg/arm_pkg/astra_arm.py +++ /dev/null @@ -1,145 +0,0 @@ -import rclpy -from rclpy.node import Node -import numpy as np -import time, math, os -from math import sin, cos, pi -from ament_index_python.packages import get_package_share_directory -from ikpy.chain import Chain -from ikpy.link import OriginLink, URDFLink -#import pygame as pyg -from scipy.spatial.transform import Rotation as R -from geometry_msgs.msg import Vector3 - - -# Misc -degree = pi / 180.0 - - -def convert_angles(angles): - # Converts angles to the format used for the urdf (contains some dummy joints) - return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0] - - -class Arm: - def __init__(self, urdf_name): - self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid - # URDF file path - self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name) - # IKpy Chain - self.chain = Chain.from_urdf_file(self.urdf) - - - # Arrays for joint states - # Some links in the URDF are static (non-joints), these will remain zero for IK - # Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector - self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] - self.current_angles = self.zero_angles - self.last_angles = self.zero_angles - self.ik_angles = self.zero_angles - - self.current_position: list[float] = [] - self.target_position = [0.0, 0.0, 0.0] - self.target_orientation: list = [] # Effector orientation desired at target position. - # Generally orientation for the effector is modified manually by the operator. - - # Might not need, copied over from state_publisher.py in ik_test - #self.step = 0.03 # Max movement increment - - - def perform_ik(self, target_position, logger): - self.target_position = target_position - # Update the target orientation to the current orientation - self.update_orientation() - # print(f"[IK FOR] Target Position: {self.target_position}") - try: - # print(f"[TRY] Current Angles: {self.current_angles}") - # print(f"[TRY] Target Position: {self.target_position}") - # print(f"[TRY] Target Orientation: {self.target_orientation}") - self.ik_angles = self.chain.inverse_kinematics( - target_position=self.target_position, - target_orientation=self.target_orientation, - initial_position=self.current_angles, - orientation_mode="all" - ) - # Check if the solution is within the tolerance - fk_matrix = self.chain.forward_kinematics(self.ik_angles) - - fk_position = fk_matrix[:3, 3] # type: ignore - - # print(f"[TRY] FK Position for Solution: {fk_position}") - - error = np.linalg.norm(target_position - fk_position) - if error > self.ik_tolerance: - logger.info(f"No VALID IK Solution within tolerance. Error: {error}") - return False - else: - logger.info(f"IK Solution Found. Error: {error}") - return True - except Exception as e: - logger.info(f"IK failed for exception: {e}") - return False - - # # Given the FK_Matix for the arm's current pose, update the orientation array - # def update_orientation(self, fk_matrix): - # self.target_orientation = fk_matrix[:3, :3] - # return - - # def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist): - # self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0] - # return - - # Get current orientation of the end effector and update target_orientation - def update_orientation(self): - - # FK matrix for arm's current pose - fk_matrix = self.chain.forward_kinematics(self.current_angles) - - # Update target_orientation to the effector's current orientation - self.target_orientation = fk_matrix[:3, :3] # type: ignore - - # Update current angles to those provided - # Resetting last_angles to the new angles - # - # Use: First call, or when angles are changed manually. - def reset_angles(self, angles): - # Update angles to the new angles - self.current_angles = convert_angles(angles) - self.last_angles = self.current_angles - - # Update current angles to those provided - # Maintain previous angles in last_angles - # - # Use: Repeated calls during IK operation - def update_angles(self, angles): - # Update angles to the new angles - self.last_angles = self.current_angles - self.current_angles = convert_angles(angles) - - # Get current X,Y,Z position of end effector - def get_position(self): - # FK matrix for arm's current pose - fk_matrix = self.chain.forward_kinematics(self.current_angles) - - # Get the position of the end effector from the FK matrix - position = fk_matrix[:3, 3] # type: ignore - - return position - - # Get current X,Y,Z position of end effector - def get_position_vector(self): - # FK matrix for arm's current pose - fk_matrix = self.chain.forward_kinematics(self.current_angles) - - # Get the position of the end effector from the FK matrix - position = fk_matrix[:3, 3] # type: ignore - - # Return position as a NumPy array - return np.array(position) - - - def update_position(self): - # FK matrix for arm's current pose - fk_matrix = self.chain.forward_kinematics(self.current_angles) - - # Get the position of the end effector from the FK matrix and update current pos - self.current_position = fk_matrix[:3, 3] # type: ignore