69 Commits

Author SHA1 Message Date
David Sharpe
5c0cd926da feat: add Tristan's IK with ikpy (#15 ik_dev)
Add IK (Tristan's work)
Slow, but works. Takes a 3d displacement vector in meters for direction that the arm should be moving. Provides target joint angles that embedded drives the axes to.
2025-08-18 12:14:05 -05:00
David
4e1e0e29dd style: get ready for main 2025-08-12 08:49:12 -05:00
David
cdc2c7e703 refactor: post-comp IK testing 2025-07-24 00:07:06 -05:00
Tristan McGinnis
bd5c3c3c5a print debug to screen & ik debug publisher 2025-07-23 23:08:26 -05:00
Tristan McGinnis
d270235088 add current angles to ik debug output 2025-07-23 23:08:26 -05:00
Tristan McGinnis
35208150ef remove command send for perform_ik 2025-07-23 23:08:26 -05:00
Tristan McGinnis
fb699984b9 Removing print statements from anchor and core to clean up output 2025-07-23 23:08:12 -05:00
Tristan McGinnis
2eb3c796ec Doing testing for performance 2025-07-23 23:07:56 -05:00
Tristan McGinnis
da939cc381 No publishing controls, to view debug output 2025-07-23 23:07:23 -05:00
Tristan McGinnis
2c27c81dc5 update angles for IK when control command is sent 2025-07-23 23:07:23 -05:00
Tristan McGinnis
3288aea14c revert changes for split 2025-07-23 23:07:23 -05:00
Tristan McGinnis
0416277cd9 attempting to fix split error 2025-07-23 23:07:23 -05:00
Tristan McGinnis
440a94f0cc refactor some things, reenable feedback 2025-07-23 23:07:23 -05:00
Tristan McGinnis
9d13d487cb Fixes for string splitting 2025-07-23 23:07:00 -05:00
Tristan McGinnis
0ea4c73876 comment out some stuff to test delays 2025-07-23 23:05:48 -05:00
Tristan McGinnis
43fdc7587a send manual control debug print 2025-07-23 23:04:59 -05:00
Tristan McGinnis
53b4259ade disable feedback 2025-07-23 23:04:41 -05:00
Tristan McGinnis
482bedbfaf debug print for manual control 2025-07-23 23:04:41 -05:00
Tristan McGinnis
923dfa20ca remove all debug publishing for arm 2025-07-23 23:04:06 -05:00
Tristan McGinnis
6a747f92fb Fix: Add more dummy values to support new urdf 2025-07-23 23:03:44 -05:00
Tristan McGinnis
9f8f51b742 Fix: missed a spot 2025-07-23 23:03:44 -05:00
Tristan McGinnis
8b9b72e78f Add dummy link for effector 2025-07-23 23:03:44 -05:00
Tristan McGinnis
c9533e3f55 Convert angles to radians for IK 2025-07-23 23:03:44 -05:00
ASTRA-SHC
2860463501 fixes and debug to ik- need to update the model 2025-07-23 23:03:23 -05:00
Tristan McGinnis
60952db588 swap to numpy arrays 2025-07-23 23:03:23 -05:00
Tristan McGinnis
3d18e20946 More fixes for arm_node.py 2025-07-23 23:03:23 -05:00
Tristan McGinnis
5eb9e8a2e3 add get_position_vector3() 2025-07-23 23:03:23 -05:00
Tristan McGinnis
c302626512 remove vector normalization- done on base station 2025-07-23 23:03:23 -05:00
Tristan McGinnis
5c41d66404 debug printing 2025-07-23 23:03:23 -05:00
Tristan McGinnis
dde9d61a33 fix formatting for anchor print statements in arm and core 2025-07-23 23:03:23 -05:00
Tristan McGinnis
3985c11ae2 Added debug output to /arm/feedback/debug for testing IK 2025-07-23 23:02:59 -05:00
Tristan McGinnis
4420e83981 remove debug print for arm_node 2025-07-23 23:01:26 -05:00
Tristan McGinnis
f00a7d21ce Fix: arm angle feedback working 2025-07-23 23:01:26 -05:00
Tristan McGinnis
27cf4d9982 debug print for arm_node socket_pub_callback 2025-07-23 23:00:56 -05:00
Tristan McGinnis
8e3f2ee88a fix: anchor publishes feedback for respective feedback topics 2025-07-23 23:00:56 -05:00
Tristan McGinnis
9826b39e7e Fix: arm feedback through anchor 2025-07-23 23:00:21 -05:00
Tristan McGinnis
0c3f27667a SIGSTP no longer does sys.exit() 2025-07-23 22:59:45 -05:00
Tristan McGinnis
8c4f75f75e feat: nodes should die entirely if no MCU found
This should allow anchor to restart automatically if it's being run as a service
2025-07-23 22:59:45 -05:00
Tristan McGinnis
f12daff861 feat: fully remove socket_feedback class 2025-07-23 22:59:45 -05:00
ASTRA-SHC
e059f5cfec nuking socket feedback class 2025-07-23 22:59:45 -05:00
Tristan McGinnis
b6348e4c00 temp: print statements 2025-07-23 22:59:11 -05:00
Tristan McGinnis
99916b317f temp: raw angle output arm_node 2025-07-23 22:58:54 -05:00
Tristan McGinnis
5fc704dbc4 fix: proper astra_arm import for arm_node 2025-07-23 22:58:54 -05:00
Tristan McGinnis
5df3027fa0 fix: anchor enables CAN Relay 2025-07-23 22:58:54 -05:00
Tristan McGinnis
f4a611567e feat: IK result calculated and sent to MCU 2025-07-23 22:58:54 -05:00
Tristan McGinnis
9dacdfb385 refactor: feedback record helper functions 2025-07-23 22:58:06 -05:00
Tristan McGinnis
a417034436 fix: update socketFeedback() class 2025-07-23 22:56:54 -05:00
Tristan McGinnis
85a231478e feat: All feedback for socket, early ik implement 2025-07-23 22:56:54 -05:00
Tristan McGinnis
ec3b95944d Add helper functions to arm class 2025-07-23 22:51:56 -05:00
Tristan McGinnis
0adab485f2 Add urdf folder and arm11.urdf 2025-07-23 22:51:56 -05:00
ASTRA
ddb6d672ad Increase arm feedback freq 2025-05-31 10:01:33 -05:00
David Sharpe
6f80d628b1 fix: arm brake mode now casts correctly 2025-05-31 09:51:05 -05:00
Tristan McGinnis
d4543a224a digit linear actuator control 2025-05-30 23:58:57 -06:00
David
fd240cf160 feat: add rev feedback and brake mode for arm 2025-05-31 01:56:50 +00:00
Tristan McGinnis
a700fd546c Update arm_headless.py, remove read_feedback() 2025-05-30 19:48:41 -06:00
David
48b0f7e1f9 fix: replace int() with round(float()) for core motor feedback 2025-05-31 01:05:11 +00:00
Tristan McGinnis
822f84c1c1 Remove /10.0 on heading for core feedback 2025-05-30 19:01:50 -06:00
David
5f708a2002 feat: add rev motor feedback for core 2025-05-30 03:02:01 +00:00
Tristan McGinnis
baefd0661e Merge pull request #13 from SHC-ASTRA/improve-core-headless
Improve core headless
2025-05-29 18:28:32 -06:00
David
4d36ab8636 fix: msg.split not msg.data.split 2025-05-28 23:15:10 -06:00
David
dad0590dca fix: msg.split not msg.data.split 2025-05-29 05:07:54 +00:00
Tristan McGinnis
80d59cc275 reorganize core headless node for usage as a service 2025-05-28 22:57:38 -06:00
Tristan McGinnis
b06194053c invert controls for sticks on core headless, set max speed up to 90 2025-05-28 22:44:56 -06:00
Tristan McGinnis
90637485b7 Subup 2025-05-28 10:58:01 -06:00
Tristan McGinnis
5c3eae2318 Merge pull request #12 from SHC-ASTRA/gps-altitude
subup, update core to add gps altitude
2025-05-28 11:12:28 -05:00
Tristan McGinnis
3516d36294 subup, update core to add gps altitude 2025-05-28 10:10:59 -06:00
Tristan McGinnis
1ea247bac0 Remove bio output. Consolidate CAN commands 2025-05-28 09:19:42 -06:00
Tristan McGinnis
3dd544d711 Update anchor_node.py for bio feedback from digit 2025-05-27 16:19:19 -06:00
Tristan McGinnis
bfb73f3421 Merge pull request #11 from SHC-ASTRA/ptz-node
Ptz node
2025-05-27 16:50:27 -05:00
11 changed files with 978 additions and 188 deletions

View File

@@ -7,6 +7,7 @@ import time
import atexit
import serial
import os
import sys
import threading
import glob
@@ -45,6 +46,7 @@ class SerialRelay(Node):
#(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
ser.write(b"can_relay_mode,on\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -88,11 +90,12 @@ class SerialRelay(Node):
#self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
self.arm_pub.publish(msg)
elif output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
if output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
@@ -103,19 +106,19 @@ class SerialRelay(Node):
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def send_cmd(self, msg):
message = msg.data
@@ -147,6 +150,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -42,7 +42,7 @@ class Headless(Node):
# Depricated, kept temporarily for reference
#self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
#self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
@@ -81,14 +81,14 @@ class Headless(Node):
while rclpy.ok():
#Check the pico for updates
self.read_feedback()
#self.read_feedback()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
#self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
self.read_feedback()
#self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
@@ -142,7 +142,7 @@ class Headless(Node):
input.axis0 = -1
if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
input.axis1 = -1 * round(self.gamepad.get_axis(0))
input.axis1 = round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
@@ -268,22 +268,6 @@ class Headless(Node):
# pass
def read_feedback(self, msg):
# Create a string message object
#msg = String()
# Set message data
#msg.data = output
# Publish data
#self.publisher.publish(msg.data)
print(f"[MCU] {msg.data}", end="")
#print(f"[Pico] Publishing: {msg}")

View File

@@ -1,5 +1,6 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
import serial
import sys
import threading
@@ -13,9 +14,34 @@ from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback
from ros2_interfaces_pkg.msg import DigitFeedback
# IK-Related imports
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
# from ikpy.chain import Chain
# from ikpy.link import OriginLink, URDFLink
# #import pygame as pyg
# from scipy.spatial.transform import Rotation as R
from . import astra_arm
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initialize node
@@ -30,17 +56,20 @@ class SerialRelay(Node):
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
# Topics used in anchor mode
if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.arm = astra_arm.Arm('arm12.urdf')
self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
@@ -56,7 +85,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1)
#print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -124,57 +153,117 @@ class SerialRelay(Node):
pass
def send_ik(self, msg):
pass
# Convert Vector3 to a NumPy array
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
# decrease input vector by 90%
input_raw = input_raw * 0.2
def send_manual(self, msg):
if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0:
self.get_logger().info("No input, stopping arm.")
command = "can_relay_tovic,arm,39,0,0,0,0\n"
self.send_cmd(command)
return
# Debug output
tempMsg = String()
tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
#self.debug_pub.publish(tempMsg)
# Target position is current position + input vector
current_position = self.arm.get_position_vector()
target_position = current_position + input_raw
#Print for IK DEBUG
tempMsg = String()
# tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
self.ik_debug.publish(tempMsg)
self.get_logger().info(f"[IK] {tempMsg.data}")
# Debug output for current position
#tempMsg.data = "Current Position: " + str(current_position)
#self.debug_pub.publish(tempMsg)
# Debug output for target position
#tempMsg.data = "Target Position: " + str(target_position)
#self.debug_pub.publish(tempMsg)
# Perform IK with the target position
if self.arm.perform_ik(target_position, self.get_logger()):
# Send command to control
#command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
#self.send_cmd(command)
self.get_logger().info(f"IK Success: {target_position}")
self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
# tempMsg = String()
# tempMsg.data = "IK Success: " + str(target_position)
# #self.debug_pub.publish(tempMsg)
# tempMsg.data = "Sending: " + str(command)
#self.debug_pub.publish(tempMsg)
# Send the IK angles to the MCU
command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
# self.send_cmd(command)
# Manual control for Wrist/Effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
else:
self.get_logger().info("IK Fail")
self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
# tempMsg = String()
# tempMsg.data = "IK Fail"
#self.debug_pub.publish(tempMsg)
def send_manual(self, msg: ArmManual):
axis0 = msg.axis0
axis1 = -1 * msg.axis1
axis2 = msg.axis2
axis3 = msg.axis3
#Send controls for arm
command = "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
#self.send_cmd(command)
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
#Send controls for end effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
self.send_cmd(command)
#print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll
# ef_yaw = msg.effector_yaw
# gripper = msg.gripper
# actuator = msg.linear_actuator
# laser = msg.laser
# #Send controls for digit
# command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
return
def send_cmd(self, msg):
def send_cmd(self, msg: str):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.get_logger().info(f"[Arm to MCU] {msg.data}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg):
def anchor_feedback(self, msg: String):
output = msg.data
if output.startswith("can_relay_fromvic,arm,55"):
#pass
@@ -183,7 +272,6 @@ class SerialRelay(Node):
#pass
self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"):
#pass
self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",")
@@ -205,17 +293,17 @@ class SerialRelay(Node):
self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: String):
# Angle feedbacks,
#split the msg.data by commas
parts = msg.data.split(",")
def updateAngleFeedback(self, msg: str):
# Angle feedbacks,
# split the msg.data by commas
parts = msg.split(",")
if len(parts) >= 7:
# Extract the angles from the string
angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in]
#angles[0] = 0.0 #override axis0 to zero
# angles[0] = 0.0 #override axis0 to zero
#
#
#THIS NEEDS TO BE REMOVED LATER
@@ -227,7 +315,7 @@ class SerialRelay(Node):
#
#
# # Update the arm's current angles
#self.arm.update_angles(angles)
self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2]
@@ -240,10 +328,9 @@ class SerialRelay(Node):
else:
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: String):
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.data.split(",")
parts = msg.split(",")
if len(parts) >= 7:
# Extract the voltage from the string
voltages_in = parts[3:7]
@@ -254,25 +341,29 @@ class SerialRelay(Node):
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else:
self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg):
# Motor voltage/current/temperature feedback
return
# parts = msg.data.split(",")
# if len(parts) >= 7:
# # Extract the voltage/current/temperature from the string
# values_in = parts[3:7]
# # Convert the voltages to floats
# for i in range(4):
# #update arm_feedback's axisX_temp for each axis0_temp, axis1_temp, etc...
# pass
# # self.arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0)
# else:
# self.get_logger().info("Invalid motor feedback input format")
def updateMotorFeedback(self, msg: str):
parts = str(msg.strip()).split(",")
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
if motorId == 1:
self.arm_feedback.axis1_temp = temp
self.arm_feedback.axis1_voltage = voltage
self.arm_feedback.axis1_current = current
elif motorId == 2:
self.arm_feedback.axis2_temp = temp
self.arm_feedback.axis2_voltage = voltage
self.arm_feedback.axis2_current = current
elif motorId == 3:
self.arm_feedback.axis3_temp = temp
self.arm_feedback.axis3_voltage = voltage
self.arm_feedback.axis3_current = current
elif motorId == 4:
self.arm_feedback.axis0_temp = temp
self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current
@staticmethod
@@ -282,8 +373,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
if self.ser.is_open:
self.ser.close()
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -299,6 +393,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -0,0 +1,145 @@
import rclpy
from rclpy.node import Node
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
from ikpy.chain import Chain
from ikpy.link import OriginLink, URDFLink
#import pygame as pyg
from scipy.spatial.transform import Rotation as R
from geometry_msgs.msg import Vector3
# Misc
degree = pi / 180.0
def convert_angles(angles):
# Converts angles to the format used for the urdf (contains some dummy joints)
return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
class Arm:
def __init__(self, urdf_name):
self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
# URDF file path
self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
# IKpy Chain
self.chain = Chain.from_urdf_file(self.urdf)
# Arrays for joint states
# Some links in the URDF are static (non-joints), these will remain zero for IK
# Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector
self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.current_angles = self.zero_angles
self.last_angles = self.zero_angles
self.ik_angles = self.zero_angles
self.current_position: list[float] = []
self.target_position = [0.0, 0.0, 0.0]
self.target_orientation: list = [] # Effector orientation desired at target position.
# Generally orientation for the effector is modified manually by the operator.
# Might not need, copied over from state_publisher.py in ik_test
#self.step = 0.03 # Max movement increment
def perform_ik(self, target_position, logger):
self.target_position = target_position
# Update the target orientation to the current orientation
self.update_orientation()
# print(f"[IK FOR] Target Position: {self.target_position}")
try:
# print(f"[TRY] Current Angles: {self.current_angles}")
# print(f"[TRY] Target Position: {self.target_position}")
# print(f"[TRY] Target Orientation: {self.target_orientation}")
self.ik_angles = self.chain.inverse_kinematics(
target_position=self.target_position,
target_orientation=self.target_orientation,
initial_position=self.current_angles,
orientation_mode="all"
)
# Check if the solution is within the tolerance
fk_matrix = self.chain.forward_kinematics(self.ik_angles)
fk_position = fk_matrix[:3, 3] # type: ignore
# print(f"[TRY] FK Position for Solution: {fk_position}")
error = np.linalg.norm(target_position - fk_position)
if error > self.ik_tolerance:
logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
return False
else:
logger.info(f"IK Solution Found. Error: {error}")
return True
except Exception as e:
logger.info(f"IK failed for exception: {e}")
return False
# # Given the FK_Matix for the arm's current pose, update the orientation array
# def update_orientation(self, fk_matrix):
# self.target_orientation = fk_matrix[:3, :3]
# return
# def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist):
# self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0]
# return
# Get current orientation of the end effector and update target_orientation
def update_orientation(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Update target_orientation to the effector's current orientation
self.target_orientation = fk_matrix[:3, :3] # type: ignore
# Update current angles to those provided
# Resetting last_angles to the new angles
#
# Use: First call, or when angles are changed manually.
def reset_angles(self, angles):
# Update angles to the new angles
self.current_angles = convert_angles(angles)
self.last_angles = self.current_angles
# Update current angles to those provided
# Maintain previous angles in last_angles
#
# Use: Repeated calls during IK operation
def update_angles(self, angles):
# Update angles to the new angles
self.last_angles = self.current_angles
self.current_angles = convert_angles(angles)
# Get current X,Y,Z position of end effector
def get_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
return position
# Get current X,Y,Z position of end effector
def get_position_vector(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
# Return position as a NumPy array
return np.array(position)
def update_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix and update current pos
self.current_position = fk_matrix[:3, 3] # type: ignore

View File

@@ -1,4 +1,6 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'arm_pkg'
@@ -10,6 +12,8 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*')),
(os.path.join('share', package_name, 'urdf'), glob('urdf/*')),
],
install_requires=['setuptools'],
zip_safe=True,

239
src/arm_pkg/urdf/arm11.urdf Normal file
View File

@@ -0,0 +1,239 @@
<?xml version="1.0" ?>
<!-- =================================================================================== -->
<!-- | This document was autogenerated by xacro from omni_description/robots/omnipointer_arm_only.urdf.xacro | -->
<!-- | EDITING THIS FILE BY HAND IS NOT RECOMMENDED | -->
<!-- =================================================================================== -->
<robot name="omnipointer" xmlns:xacro="http://ros.org/wiki/xacro">
<!--MX64trntbl + MX106: ? + 0.153 -->
<!-- singleaxismount + 2.5girder + singleaxismount + base + MX106: 0.007370876 + 0.0226796 + 0.007370876 + ? + 0.153 -->
<!-- frame106 + singleaxismount + adjustgirder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.110563 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 5girder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.0368544 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 2.5girder + singleaxismount + tip: 0.0907185 + 0.00737088 + 0.0226796 + 0.00737088 + ? -->
<material name="omni/Blue">
<color rgba="0 0 0.8 1"/>
</material>
<material name="omni/Red">
<color rgba="1 0 0 1"/>
</material>
<material name="omni/Green">
<color rgba="0 1 0 1"/>
</material>
<material name="omni/Yellow">
<color rgba="1 1 0 1"/>
</material>
<material name="omni/LightGrey">
<color rgba="0.6 0.6 0.6 1"/>
</material>
<material name="omni/DarkGrey">
<color rgba="0.4 0.4 0.4 1"/>
</material>
<!-- Now we can start using the macros xacro:included above to define the actual omnipointer -->
<!-- The first use of a macro. This one was defined in youbot_base/base.urdf.xacro above.
A macro like this will expand to a set of link and joint definitions, and to additional
Gazebo-related extensions (sensor plugins, etc). The macro takes an argument, name,
that equals "base", and uses it to generate names for its component links and joints
(e.g., base_link). The xacro:included origin block is also an argument to the macro. By convention,
the origin block defines where the component is w.r.t its parent (in this case the parent
is the world frame). For more, see http://www.ros.org/wiki/xacro -->
<!-- foot for arm-->
<link name="base_link">
<inertial>
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="10.0"/>
<inertia ixx="0.1" ixy="0" ixz="0" iyy="0.1" iyz="0" izz="0.1"/>
</inertial>
</link>
<!-- joint between base_link and arm_0_link -->
<joint name="arm_joint_0" type="fixed">
<origin rpy="0 0 0" xyz="0 0 0"/>
<parent link="base_link"/>
<child link="arm_link_0"/>
</joint>
<link name="arm_link_0">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
<material name="omni/LightGrey"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<mass value="0.2"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000267245666667" ixy="0" ixz="0" iyy="0.000267245666667" iyz="0" izz="0.000435483"/>
</inertial>
</link>
<joint name="arm_joint_1" type="revolute">
<parent link="arm_link_0"/>
<child link="arm_link_1"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-3.1415926535" upper="3.1415926535" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.0545"/>
<axis xyz="0 0 1"/>
</joint>
<link name="arm_link_1">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<mass value="0.190421352"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000279744834534" ixy="0" ixz="0" iyy="0.000265717763008" iyz="0" izz="6.53151584738e-05"/>
</inertial>
</link>
<joint name="arm_joint_2" type="revolute">
<parent link="arm_link_1"/>
<child link="arm_link_2"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.15"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_2">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
<material name="omni/Red"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<mass value="0.29302326"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00251484771035" ixy="0" ixz="0" iyy="0.00248474836108" iyz="0" izz="9.19936757328e-05"/>
</inertial>
</link>
<joint name="arm_joint_3" type="revolute">
<parent link="arm_link_2"/>
<child link="arm_link_3"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.471"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_3">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
<material name="omni/Yellow"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<mass value="0.21931466"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000791433503053" ixy="0" ixz="0" iyy="0.000768905501178" iyz="0" izz="6.88531064581e-05"/>
</inertial>
</link>
<joint name="arm_joint_4" type="revolute">
<parent link="arm_link_3"/>
<child link="arm_link_4"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.377"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_4">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
<material name="omni/Green"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_5" type="revolute">
<parent link="arm_link_4"/>
<child link="arm_link_5"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.132"/>
<axis xyz="1 0 0"/>
</joint>
<link name="arm_link_5">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_6" type="fixed">
<parent link="arm_link_5"/>
<child link="arm_link_6"/>
<origin rpy="0 0 0" xyz="0 0 0.248"/>
</joint>
<link name="arm_link_6">
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="1e-12"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="1e-12" ixy="0" ixz="0" iyy="1e-12" iyz="0" izz="1e-12"/>
</inertial>
</link>
<!-- END OF ARM LINKS/JOINTS -->
</robot>

307
src/arm_pkg/urdf/arm12.urdf Normal file
View File

@@ -0,0 +1,307 @@
<robot name="robot">
<link name="base_footprint"></link>
<joint name="base_joint" type="fixed">
<parent link="base_footprint" />
<child link="base_link" />
<origin xyz="0 0 0.0004048057655643422" rpy="0 0 0" />
</joint>
<link name="base_link">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
<material name="base_link-material">
<color rgba="0 0.6038273388475408 1 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.17" ixy="0" ixz="0" iyy="0.17" iyz="0" izz="0.05" />
</inertial>
</link>
<joint name="Axis_0_Joint" type="revolute">
<parent link="base_link" />
<child link="Axis_0" />
<origin xyz="0 0 0.035" rpy="0 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.5" upper="2.5" velocity="0.5"/> </joint>
<link name="Axis_0">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
<material name="Axis_0-material">
<color rgba="0.3515325994898463 0.4735314961384573 0.9301108583738498 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_Joint" type="revolute">
<parent link="Axis_0" />
<child link="Axis_1" />
<origin xyz="0 0 0.11189588647115647" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.7" upper="1.7" velocity="0.5"/> </joint>
<link name="Axis_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
<material name="Axis_1-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_to_Segment_1" type="fixed">
<parent link="Axis_1" />
<child link="Segment_1" />
<origin xyz="0 0.2350831500270899 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
<material name="Segment_1-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_Joint" type="revolute">
<parent link="Segment_1" />
<child link="Axis_2" />
<origin xyz="0 -5.219894517229704e-17 0.2637568842473722" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.7" upper="2.7" velocity="0.5"/> </joint>
<link name="Axis_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
<material name="Axis_2-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_to_Segment_2" type="fixed">
<parent link="Axis_2" />
<child link="Segment_2" />
<origin xyz="0 0.19535682173790003 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
<material name="Segment_2-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_Joint" type="revolute">
<parent link="Segment_2" />
<child link="Axis_3" />
<origin xyz="0 -4.337792830220178e-17 0.199625776257357" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
<material name="Axis_3-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_to_Segment_3" type="fixed">
<parent link="Axis_3" />
<child link="Segment_3" />
<origin xyz="0 0.06725724726912972 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
<material name="Segment_3-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Wrist_Joint" type="revolute">
<parent link="Segment_3" />
<child link="Axis_4" />
<origin xyz="0 0 0.0655808825338593" rpy="0 1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_4">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
<material name="Axis_4-material">
<color rgba="0.23455058215026167 0.9301108583738498 0.21952619971859377 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Continuous_Joint" type="revolute">
<parent link="Axis_4" />
<child link="Axis_4_C" />
<origin xyz="0.0009533507860803557 0 0" rpy="0 -1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-3.14" upper="3.14" velocity="0.5"/>
</joint>
<link name="Axis_4_C">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
<material name="Axis_4_C-material">
<color rgba="0.006048833020386069 0.407240211891531 0.15592646369776456 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_4_C_to_Effector" type="fixed">
<parent link="Axis_4_C" />
<child link="Effector" />
<origin xyz="0 0 0.06478774571448076" rpy="0 0 0" />
</joint>
<link name="Effector">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.05 0.01 0.135" />
</geometry>
<material name="Effector-material">
<color rgba="0.2746773120495699 0.01680737574872402 0.5711248294565854 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.01 0.01 0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.16666666666666666" ixy="0" ixz="0" iyy="0.16666666666666666" iyz="0" izz="0.16666666666666666" />
</inertial>
</link>
</robot>

View File

@@ -3,6 +3,7 @@ from rclpy.node import Node
import serial
import sys
import threading
import os
import glob
import time
import atexit
@@ -144,10 +145,11 @@ class SerialRelay(Node):
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
self.send_cmd(command)
#self.send_cmd(command)
# Vibration Motor
command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
self.send_cmd(command)
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
#self.send_cmd(command)
# FAERIE Control Commands
@@ -156,16 +158,15 @@ class SerialRelay(Node):
# To be reviewed before use#
# Laser
command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
#self.send_cmd(command)
# Drill (SCABBARD)
command = f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
print(msg.drill)
self.send_cmd(command)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
#self.send_cmd(command)
# Bio linear actuator
command = "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
@@ -182,7 +183,7 @@ class SerialRelay(Node):
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
self.get_logger().info(f"[Bio Anchor] {msg.data}")
#self.get_logger().info(f"[Bio Anchor] {msg.data}")
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
@@ -202,8 +203,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
if self.ser.is_open:
self.ser.close()
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -219,6 +223,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -20,113 +20,86 @@ os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
max_speed = 75 #Max speed as a duty cycle percentage (1-100)
max_speed = 90 #Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("core_headless")
self.create_timer(0.20, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
# Initialize pygame first
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}')
#
#
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
#Check the pico for updates
self.send_controls()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = CoreControl()
input.max_speed = max_speed
input.right_stick = round(self.gamepad.get_axis(4),2)#right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = round(self.gamepad.get_axis(1),2)#lext y-axis
if pygame.joystick.get_count() != 0:
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' #stop the rover if there is no controller connected
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
else:
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.get_logger().info(f"[Ctrl] Stopping. No Gamepad Connected. ")
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}'
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__':
main()

View File

@@ -1,5 +1,6 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
from std_srvs.srv import Empty
import signal
@@ -7,6 +8,7 @@ import time
import atexit
import serial
import os
import sys
import threading
import glob
@@ -18,6 +20,17 @@ from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None
thread = None
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
@@ -169,7 +182,7 @@ class SerialRelay(Node):
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg):
def send_controls(self, msg: CoreControl):
#can_relay_tovic,core,19, left_stick, right_stick
if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
@@ -182,14 +195,14 @@ class SerialRelay(Node):
self.send_cmd(command)
#print(f"[Sys] Relaying: {command}")
def send_cmd(self, msg):
def send_cmd(self, msg: str):
if self.launch_mode == 'anchor':
#self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String()#Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'core':
self.get_logger().info(f"[Core to MCU] {msg}")
self.get_logger().info(f"[Core to MCU] {msg.data}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String):
@@ -202,25 +215,46 @@ class SerialRelay(Node):
self.core_feedback.gps_long = float(parts[3])
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count
self.core_feedback.gps_sats = round(float(parts[3]))
elif output.startswith("can_relay_fromvic,core,51"):#Gyro x,y,z
#if parts length is at least 5 then we should have altitude, this is a temporary check until fully implemented
if len(parts) >= 5:
self.core_feedback.gps_alt = round(float(parts[4]), 2)
elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z
self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6]))
elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading
self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.orientation = float(parts[6]) / 10.0
elif output.startswith("can_relay_fromvic,core,53"):#Rev motor feedback
pass
#self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,core,54"):#bat, 12, 5, 3, Voltage readings * 100
self.core_feedback.orientation = float(parts[6])
elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
if motorId == 1:
self.core_feedback.fl_temp = temp
self.core_feedback.fl_voltage = voltage
self.core_feedback.fl_current = current
elif motorId == 2:
self.core_feedback.bl_temp = temp
self.core_feedback.bl_voltage = voltage
self.core_feedback.bl_current = current
elif motorId == 3:
self.core_feedback.fr_temp = temp
self.core_feedback.fr_voltage = voltage
self.core_feedback.fr_current = current
elif motorId == 4:
self.core_feedback.br_temp = temp
self.core_feedback.br_voltage = voltage
self.core_feedback.br_current = current
elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100
self.core_feedback.bat_voltage = float(parts[3]) / 100.0
self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0
self.core_feedback.voltage_3 = float(parts[6]) / 100.0
elif output.startswith("can_relay_fromvic,core,56"):#BMP Temp, Altitude, Pressure
elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure
self.core_feedback.bmp_temp = float(parts[3])
self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5])
@@ -242,8 +276,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up before terminating...")
if self.ser.is_open:
self.ser.close()
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -262,7 +299,7 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()