mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
feat: All feedback for socket, early ik implement
This commit is contained in:
@@ -19,15 +19,50 @@ import numpy as np
|
|||||||
import time, math, os
|
import time, math, os
|
||||||
from math import sin, cos, pi
|
from math import sin, cos, pi
|
||||||
from ament_index_python.packages import get_package_share_directory
|
from ament_index_python.packages import get_package_share_directory
|
||||||
from ikpy.chain import Chain
|
# from ikpy.chain import Chain
|
||||||
from ikpy.link import OriginLink, URDFLink
|
# from ikpy.link import OriginLink, URDFLink
|
||||||
#import pygame as pyg
|
# #import pygame as pyg
|
||||||
from scipy.spatial.transform import Rotation as R
|
# from scipy.spatial.transform import Rotation as R
|
||||||
|
|
||||||
|
import astra_arm
|
||||||
|
|
||||||
serial_pub = None
|
serial_pub = None
|
||||||
thread = None
|
thread = None
|
||||||
|
|
||||||
|
|
||||||
|
# struct for socket feedback
|
||||||
|
class SocketFeedback:
|
||||||
|
def __init__(self):
|
||||||
|
self.bat_voltage = 0.0
|
||||||
|
self.voltage_12 = 0.0
|
||||||
|
self.voltage_5 = 0.0
|
||||||
|
self.voltage_3 = 0.0
|
||||||
|
self.joint_angles = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_temps = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_voltages = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_currents = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
|
||||||
|
def updateBusVoltages(self, voltages):
|
||||||
|
self.bat_voltage = voltages[0]
|
||||||
|
self.voltage_12 = voltages[1]
|
||||||
|
self.voltage_5 = voltages[2]
|
||||||
|
self.voltage_3 = voltages[3]
|
||||||
|
|
||||||
|
def updateJointVoltages(self, axis, voltage):
|
||||||
|
self.joint_voltages[axis] = voltage
|
||||||
|
|
||||||
|
def updateJointCurrents(self, axis, current):
|
||||||
|
self.joint_currents[axis] = current
|
||||||
|
|
||||||
|
def updateJointTemperatures(self, axis, temperature):
|
||||||
|
self.joint_temps[axis] = temperature
|
||||||
|
|
||||||
|
def updateJointAngles(self, angles):
|
||||||
|
self.joint_angles = angles
|
||||||
|
|
||||||
|
|
||||||
|
arm_feedback = SocketFeedback()
|
||||||
|
|
||||||
class SerialRelay(Node):
|
class SerialRelay(Node):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Initialize node
|
# Initialize node
|
||||||
@@ -55,33 +90,7 @@ class SerialRelay(Node):
|
|||||||
|
|
||||||
self.arm_feedback = SocketFeedback()
|
self.arm_feedback = SocketFeedback()
|
||||||
self.digit_feedback = DigitFeedback()
|
self.digit_feedback = DigitFeedback()
|
||||||
########
|
self.arm = astra_arm.Arm('arm11.urdf')
|
||||||
# Feedback and IK vars
|
|
||||||
########
|
|
||||||
# Misc
|
|
||||||
degree = pi / 180.0
|
|
||||||
|
|
||||||
urdf_file_name = 'arm11.urdf'
|
|
||||||
ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid
|
|
||||||
|
|
||||||
# URDF file path
|
|
||||||
urdf = os.path.join(get_package_share_directory('ik_pkg'), urdf_file_name)
|
|
||||||
# IKpy Chain
|
|
||||||
arm_chain = Chain.from_urdf_file(urdf)
|
|
||||||
|
|
||||||
# Arrays for joint states
|
|
||||||
# Some links in the URDF are static (non-joints), these will remain zero for IK
|
|
||||||
# Indexes: Ignore, Ignore, Ax_0, Ax_1, Ax2, Ax_3, Wrist, Ignore
|
|
||||||
zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
|
||||||
current_angles = zero_angles
|
|
||||||
last_angles = zero_angles
|
|
||||||
ik_angles = zero_angles
|
|
||||||
|
|
||||||
target_position = [0.0, 0.0, 0.0]
|
|
||||||
target_orientation = [] # Effector orientation desired at target position.
|
|
||||||
# Generally orientation for the effector is modified manually by the operator.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
########
|
########
|
||||||
# Interface with MCU
|
# Interface with MCU
|
||||||
@@ -144,7 +153,49 @@ class SerialRelay(Node):
|
|||||||
try:
|
try:
|
||||||
output = str(self.ser.readline(), "utf8")
|
output = str(self.ser.readline(), "utf8")
|
||||||
if output:
|
if output:
|
||||||
#self.get_logger().info(f"[MCU] {output}")
|
if output.startswith("can_relay_tovic,arm,55"):
|
||||||
|
# Angle feedbacks
|
||||||
|
parts = output.split(",")
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the angles from the string
|
||||||
|
angles_in = parts[3:7]
|
||||||
|
# Convert the angles to floats divide by 10.0
|
||||||
|
angles = [float(angle) / 10.0 for angle in angles_in]
|
||||||
|
angles[0] = 0.0
|
||||||
|
# Update the arm's current angles
|
||||||
|
self.arm.update_angles(angles)
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Invalid angle feedback input format")
|
||||||
|
elif output.startswith("can_relay_tovic,arm,54"):
|
||||||
|
# Bus Voltage feedbacks
|
||||||
|
parts = output.split(",")
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the voltage from the string
|
||||||
|
voltages_in = parts[3:7]
|
||||||
|
# Convert the voltages to floats
|
||||||
|
v_bat = float(voltages_in[0]) / 100.0
|
||||||
|
v_12 = float(voltages_in[1]) / 100.0
|
||||||
|
v_5 = float(voltages_in[2]) / 100.0
|
||||||
|
v_3 = float(voltages_in[3]) / 100.0
|
||||||
|
# Update the arm's current voltages
|
||||||
|
arm_feedback.updateBusVoltages([v_bat, v_12, v_5, v_3])
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Invalid voltage feedback input format")
|
||||||
|
elif output.startswith("can_relay_tovic,arm,53"):
|
||||||
|
# Motor voltage/current/temperature feedback
|
||||||
|
parts = output.split(",")
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the voltage/current/temperature from the string
|
||||||
|
values_in = parts[3:7]
|
||||||
|
# Convert the voltages to floats
|
||||||
|
for i in range(4):
|
||||||
|
arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0)
|
||||||
|
arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0)
|
||||||
|
arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0)
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Invalid motor feedback input format")
|
||||||
|
|
||||||
|
self.get_logger().info(f"[MCU] {output}")
|
||||||
msg = String()
|
msg = String()
|
||||||
msg.data = output
|
msg.data = output
|
||||||
self.debug_pub.publish(msg)
|
self.debug_pub.publish(msg)
|
||||||
@@ -318,6 +369,19 @@ class SerialRelay(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().info("Invalid voltage feedback input format")
|
self.get_logger().info("Invalid voltage feedback input format")
|
||||||
|
|
||||||
|
def socket_pub_callback(self):
|
||||||
|
# Create a SocketFeedback message and publish it
|
||||||
|
msg = SocketFeedback()
|
||||||
|
msg.bat_voltage = arm_feedback.bat_voltage
|
||||||
|
msg.voltage_12 = arm_feedback.voltage_12
|
||||||
|
msg.voltage_5 = arm_feedback.voltage_5
|
||||||
|
msg.voltage_3 = arm_feedback.voltage_3
|
||||||
|
msg.joint_angles = arm_feedback.joint_angles
|
||||||
|
msg.joint_temps = arm_feedback.joint_temps
|
||||||
|
msg.joint_voltages = arm_feedback.joint_voltages
|
||||||
|
msg.joint_currents = arm_feedback.joint_currents
|
||||||
|
|
||||||
|
self.socket_pub.publish(msg)
|
||||||
|
|
||||||
def send_ik(self, msg):
|
def send_ik(self, msg):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ def convert_angles(angles):
|
|||||||
return [0.0, 0.0, angles[0], angles[1], angles[2], angles[3], angles[4], 0.0]
|
return [0.0, 0.0, angles[0], angles[1], angles[2], angles[3], angles[4], 0.0]
|
||||||
|
|
||||||
|
|
||||||
class arm:
|
class Arm:
|
||||||
def __init__(self, urdf_name):
|
def __init__(self, urdf_name):
|
||||||
self.ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid
|
self.ik_tolerance = 1e-3 #Tolerance (in meters) to determine if solution is valid
|
||||||
# URDF file path
|
# URDF file path
|
||||||
@@ -43,6 +43,22 @@ class arm:
|
|||||||
#self.step = 0.03 # Max movement increment
|
#self.step = 0.03 # Max movement increment
|
||||||
|
|
||||||
|
|
||||||
|
def perform_ik(self, target_position):
|
||||||
|
self.target_position = target_position
|
||||||
|
# Update the target orientation to the current orientation
|
||||||
|
self.update_orientation()
|
||||||
|
try:
|
||||||
|
self.ik_angles = self.chain.inverse_kinematics(
|
||||||
|
target_position=self.target_position,
|
||||||
|
target_orientation=self.target_orientation,
|
||||||
|
initial_position=self.current_angles,
|
||||||
|
max_iterations=1000,
|
||||||
|
tolerance=self.ik_tolerance
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"IK failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
# Get current orientation of the end effector and update target_orientation
|
# Get current orientation of the end effector and update target_orientation
|
||||||
def update_orientation(self):
|
def update_orientation(self):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user