refactor: post-comp IK testing

This commit is contained in:
David
2025-07-24 00:07:06 -05:00
parent bd5c3c3c5a
commit cdc2c7e703
7 changed files with 138 additions and 230 deletions

View File

@@ -62,9 +62,8 @@ class SerialRelay(Node):
if self.port is None: if self.port is None:
self.get_logger().info("Unable to find MCU...") self.get_logger().info("Unable to find MCU...")
#kill the node/process entirely time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL) sys.exit(1)
sys.exit(0)
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
@@ -89,23 +88,9 @@ class SerialRelay(Node):
if output: if output:
# All output over debug temporarily # All output over debug temporarily
#self.get_logger().info(f"[MCU] {output}") #self.get_logger().info(f"[MCU] {output}")
if output.startswith("can_relay_fromvic,arm"):
# Publish the message to the arm topic
msg = String()
msg.data = output
self.arm_pub.publish(msg)
elif output.startswith("can_relay_fromvic,core"):
# Publish the message to the core topic
msg = String()
msg.data = output
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,bio"):
# Publish the message to the bio topic
msg = String()
msg.data = output
self.bio_pub.publish(msg)
msg = String() msg = String()
msg.data = output msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"): if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg) self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
@@ -121,19 +106,19 @@ class SerialRelay(Node):
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
self.exit(1) exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
print("Closing serial port.") # print("Closing serial port.")
if self.ser.is_open: # if self.ser.is_open:
self.ser.close() # self.ser.close()
self.exit(1) # exit(1)
def send_cmd(self, msg): def send_cmd(self, msg):
message = msg.data message = msg.data

View File

@@ -1,5 +1,6 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
import serial import serial
import sys import sys
import threading import threading
@@ -26,12 +27,21 @@ from ament_index_python.packages import get_package_share_directory
from . import astra_arm from . import astra_arm
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=1000,
lifespan=500,
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=5000
)
serial_pub = None serial_pub = None
thread = None thread = None
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
@@ -49,7 +59,7 @@ class SerialRelay(Node):
self.feedback_timer = self.create_timer(0.25, self.publish_feedback) self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Create subscribers
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, qos_profile=control_qos)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10) self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
@@ -59,20 +69,11 @@ class SerialRelay(Node):
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
# output to console
self.get_logger().info("Creating arm object...")
self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
self.arm = astra_arm.Arm('arm12.urdf') self.arm = astra_arm.Arm('arm12.urdf')
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
########
# Interface with MCU
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
########
if self.launch_mode == 'arm': if self.launch_mode == 'arm':
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
self.port = None self.port = None
@@ -84,7 +85,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1) ser = serial.Serial(port, 115200, timeout=1)
#print(f"Checking port {port}...") #print(f"Checking port {port}...")
ser.write(b"ping\n") ser.write(b"ping\n")
response = ser.read_until("\n") response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU # if pong is in response, then we are talking with the MCU
if b"pong" in response: if b"pong" in response:
@@ -97,10 +98,9 @@ class SerialRelay(Node):
break break
if self.port is None: if self.port is None:
self.get_logger().info("Unable to find MCU...") self.get_logger().info("Unable to find MCU... please make sure it is connected.")
#kill the node/process entirely time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL) sys.exit(1)
sys.exit(0)
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
@@ -130,18 +130,9 @@ class SerialRelay(Node):
try: try:
output = str(self.ser.readline(), "utf8") output = str(self.ser.readline(), "utf8")
if output: if output:
if output.startswith("can_relay_fromvic,arm,55"): #self.get_logger().info(f"[MCU] {output}")
pass
#self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"):
pass
#self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"):
pass
#self.updateMotorFeedback(output)
self.get_logger().info(f"[MCU] {output}")
msg = String() msg = String()
msg.data = "From MCU Got: " + output msg.data = output
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
except serial.SerialException: except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.") self.get_logger().info("SerialException caught... closing serial port.")
@@ -162,9 +153,9 @@ class SerialRelay(Node):
pass pass
def updateAngleFeedback(self, msg): def updateAngleFeedback(self, msg: str):
# Angle feedbacks, # Angle feedbacks,
#split the msg.data by commas # split the msg.data by commas
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -172,7 +163,7 @@ class SerialRelay(Node):
angles_in = parts[3:7] angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0 # Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in] angles = [float(angle) / 10.0 for angle in angles_in]
angles[0] = 0.0 #override axis0 to zero # angles[0] = 0.0 #override axis0 to zero
# #
# #
#THIS NEEDS TO BE REMOVED LATER #THIS NEEDS TO BE REMOVED LATER
@@ -197,7 +188,8 @@ class SerialRelay(Node):
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg):
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -211,22 +203,28 @@ class SerialRelay(Node):
else: else:
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg): def updateMotorFeedback(self, msg: str):
# Motor voltage/current/temperature feedback parts = str(msg.strip()).split(",")
parts = msg.split(",") motorId = round(float(parts[3]))
if len(parts) >= 7: temp = float(parts[4]) / 10.0
# Extract the voltage/current/temperature from the string voltage = float(parts[5]) / 10.0
values_in = parts[3:7] current = float(parts[6]) / 10.0
# Convert the voltages to floats if motorId == 1:
for i in range(4): self.arm_feedback.axis1_temp = temp
#update arm_feedback's axisX_temp for each axis0_temp, axis1_temp, etc... self.arm_feedback.axis1_voltage = voltage
pass self.arm_feedback.axis1_current = current
elif motorId == 2:
# self.arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0) self.arm_feedback.axis2_temp = temp
# self.arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0) self.arm_feedback.axis2_voltage = voltage
# self.arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0) self.arm_feedback.axis2_current = current
else: elif motorId == 3:
self.get_logger().info("Invalid motor feedback input format") self.arm_feedback.axis3_temp = temp
self.arm_feedback.axis3_voltage = voltage
self.arm_feedback.axis3_current = current
elif motorId == 4:
self.arm_feedback.axis0_temp = temp
self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
axis0 = msg.axis0 axis0 = msg.axis0
@@ -234,11 +232,6 @@ class SerialRelay(Node):
axis2 = msg.axis2 axis2 = msg.axis2
axis3 = msg.axis3 axis3 = msg.axis3
# tempMsg = String()
# tempMsg.data = "Sending manual"
# self.debug_pub.publish(tempMsg)
#Send controls for arm #Send controls for arm
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n" command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
@@ -293,27 +286,7 @@ class SerialRelay(Node):
#pass #pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
parts = str(output.strip()).split(",") self.updateMotorFeedback(output)
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
if motorId == 1:
self.arm_feedback.axis1_temp = temp
self.arm_feedback.axis1_voltage = voltage
self.arm_feedback.axis1_current = current
elif motorId == 2:
self.arm_feedback.axis2_temp = temp
self.arm_feedback.axis2_voltage = voltage
self.arm_feedback.axis2_current = current
elif motorId == 3:
self.arm_feedback.axis3_temp = temp
self.arm_feedback.axis3_voltage = voltage
self.arm_feedback.axis3_current = current
elif motorId == 4:
self.arm_feedback.axis0_temp = temp
self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current
elif output.startswith("can_relay_fromvic,digit,54"): elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",") parts = msg.data.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -334,80 +307,18 @@ class SerialRelay(Node):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str):
# Angle feedbacks,
#split the msg.data by commas
parts = msg.split(",")
if len(parts) >= 7:
# Extract the angles from the string
angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in]
#angles[0] = 0.0 #override axis0 to zero
#
#
#THIS NEEDS TO BE REMOVED LATER
#PLACEHOLDER FOR WRIST VALUE
#
#
angles.append(0.0)#placeholder for wrist_continuous
angles.append(0.0)#placeholder for wrist
#
#
# # Update the arm's current angles
#self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3]
# self.get_logger().info(f"Angles: {angles}")
# #debug publish angles
# tempMsg = String()
# tempMsg.data = "Angles: " + str(angles)
# #self.debug_pub.publish(tempMsg)
else:
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.split(",")
if len(parts) >= 7:
# Extract the voltage from the string
voltages_in = parts[3:7]
# Convert the voltages to floats
self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0
self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0
self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else:
self.get_logger().info("Invalid voltage feedback input format")
def publish_feedback(self):
# Create a SocketFeedback message and publish it
# msg = SocketFeedback()
# msg.bat_voltage = self.arm_feedback.bat_voltage
# msg.voltage_12 = self.arm_feedback.voltage_12
# msg.voltage_5 = self.arm_feedback.voltage_5
# msg.voltage_3 = self.arm_feedback.voltage_3
# msg.joint_angles = self.arm_feedback.joint_angles
# msg.joint_temps = self.arm_feedback.joint_temps
# msg.joint_voltages = self.arm_feedback.joint_voltages
# msg.joint_currents = self.arm_feedback.joint_currents
#debug print
self.socket_pub.publish(self.arm_feedback) #Publish feedback from arm
#self.arm.update_position() #Run FK and update the current position of the arm, using FK
def send_ik(self, msg): def send_ik(self, msg):
# Convert Vector3 to a NumPy array # Convert Vector3 to a NumPy array
input_raw = np.array([msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
# decrease input vector by 90% # decrease input vector by 90%
input_raw = input_raw * 0.2 input_raw = input_raw * 0.2
if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0:
self.get_logger().info("No input, stopping arm.")
command = "can_relay_tovic,arm,39,0,0,0,0\n"
self.send_cmd(command)
return
# Debug output # Debug output
tempMsg = String() tempMsg = String()
tempMsg.data = "From IK Control Got Vector: " + str(input_raw) tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
@@ -420,7 +331,8 @@ class SerialRelay(Node):
#Print for IK DEBUG #Print for IK DEBUG
tempMsg = String() tempMsg = String()
tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles) # tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
self.ik_debug.publish(tempMsg) self.ik_debug.publish(tempMsg)
self.get_logger().info(f"[IK] {tempMsg.data}") self.get_logger().info(f"[IK] {tempMsg.data}")
@@ -433,48 +345,41 @@ class SerialRelay(Node):
#self.debug_pub.publish(tempMsg) #self.debug_pub.publish(tempMsg)
# Perform IK with the target position # Perform IK with the target position
if self.arm.perform_ik(target_position): if self.arm.perform_ik(target_position, self.get_logger()):
# Send command to control # Send command to control
#command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n" #command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
#self.send_cmd(command) #self.send_cmd(command)
self.get_logger().info(f"IK Success: {target_position}") self.get_logger().info(f"IK Success: {target_position}")
self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
# tempMsg = String() # tempMsg = String()
# tempMsg.data = "IK Success: " + str(target_position) # tempMsg.data = "IK Success: " + str(target_position)
# #self.debug_pub.publish(tempMsg) # #self.debug_pub.publish(tempMsg)
# tempMsg.data = "Sending: " + str(command) # tempMsg.data = "Sending: " + str(command)
#self.debug_pub.publish(tempMsg) #self.debug_pub.publish(tempMsg)
# Send the IK angles to the MCU
command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
# self.send_cmd(command)
# Manual control for Wrist/Effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
else: else:
self.get_logger().info("IK Fail") self.get_logger().info("IK Fail")
self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
# tempMsg = String() # tempMsg = String()
# tempMsg.data = "IK Fail" # tempMsg.data = "IK Fail"
#self.debug_pub.publish(tempMsg) #self.debug_pub.publish(tempMsg)
# Manual control for Wrist/Effector
command = "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
# Placeholder need control for linear actuator
#command = ""
#self.send_cmd()
pass
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():
@@ -483,8 +388,11 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up...") print("Cleaning up...")
if self.ser.is_open: try:
self.ser.close() if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)

View File

@@ -1,3 +1,5 @@
import rclpy
from rclpy.node import Node
import numpy as np import numpy as np
import time, math, os import time, math, os
from math import sin, cos, pi from math import sin, cos, pi
@@ -15,14 +17,14 @@ degree = pi / 180.0
def convert_angles(angles): def convert_angles(angles):
# Converts angles to the format used for the urdf (contains some dummy joints) # Converts angles to the format used for the urdf (contains some dummy joints)
return [0.0, angles[0]*degree, angles[1]*degree, 0.0, angles[2]*degree, 0.0, angles[3]*degree, 0.0, angles[4]*degree, angles[5]*degree, 0.0] return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
class Arm: class Arm:
def __init__(self, urdf_name): def __init__(self, urdf_name):
self.ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
# URDF file path # URDF file path
self.urdf = os.path.join(get_package_share_directory('arm_pkg'), urdf_name) self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
# IKpy Chain # IKpy Chain
self.chain = Chain.from_urdf_file(self.urdf) self.chain = Chain.from_urdf_file(self.urdf)
@@ -35,16 +37,16 @@ class Arm:
self.last_angles = self.zero_angles self.last_angles = self.zero_angles
self.ik_angles = self.zero_angles self.ik_angles = self.zero_angles
self.current_position = [] self.current_position: list[float] = []
self.target_position = [0.0, 0.0, 0.0] self.target_position = [0.0, 0.0, 0.0]
self.target_orientation = [] # Effector orientation desired at target position. self.target_orientation: list = [] # Effector orientation desired at target position.
# Generally orientation for the effector is modified manually by the operator. # Generally orientation for the effector is modified manually by the operator.
# Might not need, copied over from state_publisher.py in ik_test # Might not need, copied over from state_publisher.py in ik_test
#self.step = 0.03 # Max movement increment #self.step = 0.03 # Max movement increment
def perform_ik(self, target_position): def perform_ik(self, target_position, logger):
self.target_position = target_position self.target_position = target_position
# Update the target orientation to the current orientation # Update the target orientation to the current orientation
self.update_orientation() self.update_orientation()
@@ -62,19 +64,19 @@ class Arm:
# Check if the solution is within the tolerance # Check if the solution is within the tolerance
fk_matrix = self.chain.forward_kinematics(self.ik_angles) fk_matrix = self.chain.forward_kinematics(self.ik_angles)
fk_position = fk_matrix[:3, 3] fk_position = fk_matrix[:3, 3] # type: ignore
# print(f"[TRY] FK Position for Solution: {fk_position}") # print(f"[TRY] FK Position for Solution: {fk_position}")
error = np.linalg.norm(target_position - fk_position) error = np.linalg.norm(target_position - fk_position)
if error > self.ik_tolerance: if error > self.ik_tolerance:
print(f"No VALID IK Solution within tolerance. Error: {error}") logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
return False return False
else: else:
print(f"IK Solution Found. Error: {error}") logger.info(f"IK Solution Found. Error: {error}")
return True return True
except Exception as e: except Exception as e:
print(f"IK failed for exception: {e}") logger.info(f"IK failed for exception: {e}")
return False return False
# # Given the FK_Matix for the arm's current pose, update the orientation array # # Given the FK_Matix for the arm's current pose, update the orientation array
@@ -93,7 +95,7 @@ class Arm:
fk_matrix = self.chain.forward_kinematics(self.current_angles) fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Update target_orientation to the effector's current orientation # Update target_orientation to the effector's current orientation
self.target_orientation = fk_matrix[:3, :3] self.target_orientation = fk_matrix[:3, :3] # type: ignore
# Update current angles to those provided # Update current angles to those provided
# Resetting last_angles to the new angles # Resetting last_angles to the new angles
@@ -119,7 +121,7 @@ class Arm:
fk_matrix = self.chain.forward_kinematics(self.current_angles) fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix # Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] position = fk_matrix[:3, 3] # type: ignore
return position return position
@@ -129,7 +131,7 @@ class Arm:
fk_matrix = self.chain.forward_kinematics(self.current_angles) fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix # Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] position = fk_matrix[:3, 3] # type: ignore
# Return position as a NumPy array # Return position as a NumPy array
return np.array(position) return np.array(position)
@@ -140,7 +142,4 @@ class Arm:
fk_matrix = self.chain.forward_kinematics(self.current_angles) fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix and update current pos # Get the position of the end effector from the FK matrix and update current pos
self.current_position = fk_matrix[:3, 3] self.current_position = fk_matrix[:3, 3] # type: ignore

View File

@@ -12,8 +12,8 @@ setup(
('share/ament_index/resource_index/packages', ('share/ament_index/resource_index/packages',
['resource/' + package_name]), ['resource/' + package_name]),
('share/' + package_name, ['package.xml']), ('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*launch.[pxy][yma]*'))), (os.path.join('share', package_name, 'launch'), glob('launch/*')),
(os.path.join('share', package_name), glob('urdf/*')), (os.path.join('share', package_name, 'urdf'), glob('urdf/*')),
], ],
install_requires=['setuptools'], install_requires=['setuptools'],
zip_safe=True, zip_safe=True,

View File

@@ -115,7 +115,7 @@
<child link="Axis_2" /> <child link="Axis_2" />
<origin xyz="0 -5.219894517229704e-17 0.2637568842473722" rpy="1.5707963267948963 0 0" /> <origin xyz="0 -5.219894517229704e-17 0.2637568842473722" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/> <axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint> <limit effort="1000.0" lower="-2.7" upper="2.7" velocity="0.5"/> </joint>
<link name="Axis_2"> <link name="Axis_2">
<visual> <visual>
<origin xyz="0 0 0" rpy="0 0 0" /> <origin xyz="0 0 0" rpy="0 0 0" />

View File

@@ -66,10 +66,9 @@ class SerialRelay(Node):
pass pass
if self.port is None: if self.port is None:
self.get_logger().info("Unable to find MCU...") self.get_logger().info("Unable to find MCU... please make sure it is connected.")
#kill the node/process entirely time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL) sys.exit(1)
sys.exit(0)
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
@@ -204,8 +203,11 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up...") print("Cleaning up...")
if self.ser.is_open: try:
self.ser.close() if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)

View File

@@ -1,5 +1,6 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
from std_srvs.srv import Empty from std_srvs.srv import Empty
import signal import signal
@@ -19,6 +20,17 @@ from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None serial_pub = None
thread = None thread = None
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=1000,
lifespan=500,
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=5000
)
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
@@ -33,7 +45,7 @@ class SerialRelay(Node):
self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
# Create a subscriber # Create a subscriber
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, qos_profile=control_qos)
# Create a publisher for telemetry # Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
@@ -75,9 +87,8 @@ class SerialRelay(Node):
if self.port is None: if self.port is None:
self.get_logger().info("Unable to find MCU...") self.get_logger().info("Unable to find MCU...")
#kill the node/process entirely time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL) sys.exit(1)
sys.exit(0)
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
@@ -265,8 +276,11 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") print("Cleaning up before terminating...")
if self.ser.is_open: try:
self.ser.close() if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)