diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index e008020..affeb04 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -19,15 +19,50 @@ import numpy as np import time, math, os from math import sin, cos, pi from ament_index_python.packages import get_package_share_directory -from ikpy.chain import Chain -from ikpy.link import OriginLink, URDFLink -#import pygame as pyg -from scipy.spatial.transform import Rotation as R +# from ikpy.chain import Chain +# from ikpy.link import OriginLink, URDFLink +# #import pygame as pyg +# from scipy.spatial.transform import Rotation as R +import astra_arm serial_pub = None thread = None + +# struct for socket feedback +class SocketFeedback: + def __init__(self): + self.bat_voltage = 0.0 + self.voltage_12 = 0.0 + self.voltage_5 = 0.0 + self.voltage_3 = 0.0 + self.joint_angles = [0.0, 0.0, 0.0, 0.0] + self.joint_temps = [0.0, 0.0, 0.0, 0.0] + self.joint_voltages = [0.0, 0.0, 0.0, 0.0] + self.joint_currents = [0.0, 0.0, 0.0, 0.0] + + def updateBusVoltages(self, voltages): + self.bat_voltage = voltages[0] + self.voltage_12 = voltages[1] + self.voltage_5 = voltages[2] + self.voltage_3 = voltages[3] + + def updateJointVoltages(self, axis, voltage): + self.joint_voltages[axis] = voltage + + def updateJointCurrents(self, axis, current): + self.joint_currents[axis] = current + + def updateJointTemperatures(self, axis, temperature): + self.joint_temps[axis] = temperature + + def updateJointAngles(self, angles): + self.joint_angles = angles + + +arm_feedback = SocketFeedback() + class SerialRelay(Node): def __init__(self): # Initialize node @@ -55,33 +90,7 @@ class SerialRelay(Node): self.arm_feedback = SocketFeedback() self.digit_feedback = DigitFeedback() - ######## - # Feedback and IK vars - ######## - # Misc - degree = pi / 180.0 - - urdf_file_name = 'arm11.urdf' - ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid - - # URDF file path - urdf = os.path.join(get_package_share_directory('ik_pkg'), urdf_file_name) - # IKpy Chain - arm_chain = Chain.from_urdf_file(urdf) - - # Arrays for joint states - # Some links in the URDF are static (non-joints), these will remain zero for IK - # Indexes: Ignore, Ignore, Ax_0, Ax_1, Ax2, Ax_3, Wrist, Ignore - zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] - current_angles = zero_angles - last_angles = zero_angles - ik_angles = zero_angles - - target_position = [0.0, 0.0, 0.0] - target_orientation = [] # Effector orientation desired at target position. - # Generally orientation for the effector is modified manually by the operator. - - + self.arm = astra_arm.Arm('arm11.urdf') ######## # Interface with MCU @@ -144,7 +153,49 @@ class SerialRelay(Node): try: output = str(self.ser.readline(), "utf8") if output: - #self.get_logger().info(f"[MCU] {output}") + if output.startswith("can_relay_tovic,arm,55"): + # Angle feedbacks + parts = output.split(",") + if len(parts) >= 7: + # Extract the angles from the string + angles_in = parts[3:7] + # Convert the angles to floats divide by 10.0 + angles = [float(angle) / 10.0 for angle in angles_in] + angles[0] = 0.0 + # Update the arm's current angles + self.arm.update_angles(angles) + else: + self.get_logger().info("Invalid angle feedback input format") + elif output.startswith("can_relay_tovic,arm,54"): + # Bus Voltage feedbacks + parts = output.split(",") + if len(parts) >= 7: + # Extract the voltage from the string + voltages_in = parts[3:7] + # Convert the voltages to floats + v_bat = float(voltages_in[0]) / 100.0 + v_12 = float(voltages_in[1]) / 100.0 + v_5 = float(voltages_in[2]) / 100.0 + v_3 = float(voltages_in[3]) / 100.0 + # Update the arm's current voltages + arm_feedback.updateBusVoltages([v_bat, v_12, v_5, v_3]) + else: + self.get_logger().info("Invalid voltage feedback input format") + elif output.startswith("can_relay_tovic,arm,53"): + # Motor voltage/current/temperature feedback + parts = output.split(",") + if len(parts) >= 7: + # Extract the voltage/current/temperature from the string + values_in = parts[3:7] + # Convert the voltages to floats + for i in range(4): + arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0) + arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0) + arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0) + else: + self.get_logger().info("Invalid motor feedback input format") + + self.get_logger().info(f"[MCU] {output}") msg = String() msg.data = output self.debug_pub.publish(msg) @@ -318,6 +369,19 @@ class SerialRelay(Node): else: self.get_logger().info("Invalid voltage feedback input format") + def socket_pub_callback(self): + # Create a SocketFeedback message and publish it + msg = SocketFeedback() + msg.bat_voltage = arm_feedback.bat_voltage + msg.voltage_12 = arm_feedback.voltage_12 + msg.voltage_5 = arm_feedback.voltage_5 + msg.voltage_3 = arm_feedback.voltage_3 + msg.joint_angles = arm_feedback.joint_angles + msg.joint_temps = arm_feedback.joint_temps + msg.joint_voltages = arm_feedback.joint_voltages + msg.joint_currents = arm_feedback.joint_currents + + self.socket_pub.publish(msg) def send_ik(self, msg): pass diff --git a/src/arm_pkg/arm_pkg/astra_arm.py b/src/arm_pkg/arm_pkg/astra_arm.py index 9bc84fd..2f8002b 100644 --- a/src/arm_pkg/arm_pkg/astra_arm.py +++ b/src/arm_pkg/arm_pkg/astra_arm.py @@ -17,7 +17,7 @@ def convert_angles(angles): return [0.0, 0.0, angles[0], angles[1], angles[2], angles[3], angles[4], 0.0] -class arm: +class Arm: def __init__(self, urdf_name): self.ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid # URDF file path @@ -43,6 +43,22 @@ class arm: #self.step = 0.03 # Max movement increment + def perform_ik(self, target_position): + self.target_position = target_position + # Update the target orientation to the current orientation + self.update_orientation() + try: + self.ik_angles = self.chain.inverse_kinematics( + target_position=self.target_position, + target_orientation=self.target_orientation, + initial_position=self.current_angles, + max_iterations=1000, + tolerance=self.ik_tolerance + ) + except Exception as e: + print(f"IK failed: {e}") + return False + # Get current orientation of the end effector and update target_orientation def update_orientation(self):