diff --git a/src/anchor_pkg/anchor_pkg/anchor_node.py b/src/anchor_pkg/anchor_pkg/anchor_node.py
index 6b66ddb..edef967 100644
--- a/src/anchor_pkg/anchor_pkg/anchor_node.py
+++ b/src/anchor_pkg/anchor_pkg/anchor_node.py
@@ -7,6 +7,7 @@ import time
import atexit
import serial
+import os
import sys
import threading
import glob
@@ -45,6 +46,7 @@ class SerialRelay(Node):
#(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
+ ser.write(b"can_relay_mode,on\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -88,6 +90,7 @@ class SerialRelay(Node):
#self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
+ self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
@@ -103,19 +106,19 @@ class SerialRelay(Node):
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
- self.exit(1)
+ exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
- self.exit(1)
+ exit(1)
except Exception as e:
print(f"Exception: {e}")
- print("Closing serial port.")
- if self.ser.is_open:
- self.ser.close()
- self.exit(1)
+ # print("Closing serial port.")
+ # if self.ser.is_open:
+ # self.ser.close()
+ # exit(1)
def send_cmd(self, msg):
message = msg.data
@@ -147,6 +150,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
- signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
+ #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()
diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py
index e7549d6..6b838d2 100644
--- a/src/arm_pkg/arm_pkg/arm_node.py
+++ b/src/arm_pkg/arm_pkg/arm_node.py
@@ -1,5 +1,6 @@
import rclpy
from rclpy.node import Node
+from rclpy import qos
import serial
import sys
import threading
@@ -13,9 +14,34 @@ from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback
from ros2_interfaces_pkg.msg import DigitFeedback
+
+# IK-Related imports
+import numpy as np
+import time, math, os
+from math import sin, cos, pi
+from ament_index_python.packages import get_package_share_directory
+# from ikpy.chain import Chain
+# from ikpy.link import OriginLink, URDFLink
+# #import pygame as pyg
+# from scipy.spatial.transform import Rotation as R
+
+from . import astra_arm
+
+# control_qos = qos.QoSProfile(
+# history=qos.QoSHistoryPolicy.KEEP_LAST,
+# depth=1,
+# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
+# durability=qos.QoSDurabilityPolicy.VOLATILE,
+# deadline=1000,
+# lifespan=500,
+# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
+# liveliness_lease_duration=5000
+# )
+
serial_pub = None
thread = None
+
class SerialRelay(Node):
def __init__(self):
# Initialize node
@@ -33,14 +59,17 @@ class SerialRelay(Node):
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
- self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
+ self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
+ self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
+
# Topics used in anchor mode
if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
+ self.arm = astra_arm.Arm('arm12.urdf')
self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
@@ -56,7 +85,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1)
#print(f"Checking port {port}...")
ser.write(b"ping\n")
- response = ser.read_until("\n")
+ response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -124,7 +153,78 @@ class SerialRelay(Node):
pass
def send_ik(self, msg):
- pass
+ # Convert Vector3 to a NumPy array
+ input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
+ # decrease input vector by 90%
+ input_raw = input_raw * 0.2
+
+ if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0:
+ self.get_logger().info("No input, stopping arm.")
+ command = "can_relay_tovic,arm,39,0,0,0,0\n"
+ self.send_cmd(command)
+ return
+
+ # Debug output
+ tempMsg = String()
+ tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
+ #self.debug_pub.publish(tempMsg)
+
+ # Target position is current position + input vector
+ current_position = self.arm.get_position_vector()
+ target_position = current_position + input_raw
+
+
+ #Print for IK DEBUG
+ tempMsg = String()
+ # tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
+ tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
+ self.ik_debug.publish(tempMsg)
+ self.get_logger().info(f"[IK] {tempMsg.data}")
+
+ # Debug output for current position
+ #tempMsg.data = "Current Position: " + str(current_position)
+ #self.debug_pub.publish(tempMsg)
+
+ # Debug output for target position
+ #tempMsg.data = "Target Position: " + str(target_position)
+ #self.debug_pub.publish(tempMsg)
+
+ # Perform IK with the target position
+ if self.arm.perform_ik(target_position, self.get_logger()):
+ # Send command to control
+
+ #command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
+ #self.send_cmd(command)
+ self.get_logger().info(f"IK Success: {target_position}")
+ self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
+
+ # tempMsg = String()
+ # tempMsg.data = "IK Success: " + str(target_position)
+ # #self.debug_pub.publish(tempMsg)
+ # tempMsg.data = "Sending: " + str(command)
+ #self.debug_pub.publish(tempMsg)
+
+ # Send the IK angles to the MCU
+ command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
+ # self.send_cmd(command)
+
+ # Manual control for Wrist/Effector
+ command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
+
+ command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
+
+ command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
+
+ command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
+
+ self.send_cmd(command)
+ else:
+ self.get_logger().info("IK Fail")
+ self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
+ # tempMsg = String()
+ # tempMsg.data = "IK Fail"
+ #self.debug_pub.publish(tempMsg)
+
def send_manual(self, msg: ArmManual):
axis0 = msg.axis0
@@ -151,20 +251,6 @@ class SerialRelay(Node):
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
self.send_cmd(command)
-
-
-
- #print(f"[Wrote] {command}", end="")
-
- #Not yet finished, needs embedded implementation for new commands
- # ef_roll = msg.effector_roll
- # ef_yaw = msg.effector_yaw
- # gripper = msg.gripper
- # actuator = msg.linear_actuator
- # laser = msg.laser
- # #Send controls for digit
-
- # command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
return
@@ -174,7 +260,7 @@ class SerialRelay(Node):
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
- self.get_logger().info(f"[Arm to MCU] {msg}")
+ self.get_logger().info(f"[Arm to MCU] {msg.data}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String):
@@ -186,27 +272,7 @@ class SerialRelay(Node):
#pass
self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"):
- parts = str(output.strip()).split(",")
- motorId = round(float(parts[3]))
- temp = float(parts[4]) / 10.0
- voltage = float(parts[5]) / 10.0
- current = float(parts[6]) / 10.0
- if motorId == 1:
- self.arm_feedback.axis1_temp = temp
- self.arm_feedback.axis1_voltage = voltage
- self.arm_feedback.axis1_current = current
- elif motorId == 2:
- self.arm_feedback.axis2_temp = temp
- self.arm_feedback.axis2_voltage = voltage
- self.arm_feedback.axis2_current = current
- elif motorId == 3:
- self.arm_feedback.axis3_temp = temp
- self.arm_feedback.axis3_voltage = voltage
- self.arm_feedback.axis3_current = current
- elif motorId == 4:
- self.arm_feedback.axis0_temp = temp
- self.arm_feedback.axis0_voltage = voltage
- self.arm_feedback.axis0_current = current
+ self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",")
if len(parts) >= 7:
@@ -228,8 +294,8 @@ class SerialRelay(Node):
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str):
- # Angle feedbacks,
- #split the msg.data by commas
+ # Angle feedbacks,
+ # split the msg.data by commas
parts = msg.split(",")
if len(parts) >= 7:
@@ -237,7 +303,7 @@ class SerialRelay(Node):
angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in]
- #angles[0] = 0.0 #override axis0 to zero
+ # angles[0] = 0.0 #override axis0 to zero
#
#
#THIS NEEDS TO BE REMOVED LATER
@@ -249,7 +315,7 @@ class SerialRelay(Node):
#
#
# # Update the arm's current angles
- #self.arm.update_angles(angles)
+ self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2]
@@ -262,7 +328,6 @@ class SerialRelay(Node):
else:
self.get_logger().info("Invalid angle feedback input format")
-
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.split(",")
@@ -276,6 +341,29 @@ class SerialRelay(Node):
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else:
self.get_logger().info("Invalid voltage feedback input format")
+
+ def updateMotorFeedback(self, msg: str):
+ parts = str(msg.strip()).split(",")
+ motorId = round(float(parts[3]))
+ temp = float(parts[4]) / 10.0
+ voltage = float(parts[5]) / 10.0
+ current = float(parts[6]) / 10.0
+ if motorId == 1:
+ self.arm_feedback.axis1_temp = temp
+ self.arm_feedback.axis1_voltage = voltage
+ self.arm_feedback.axis1_current = current
+ elif motorId == 2:
+ self.arm_feedback.axis2_temp = temp
+ self.arm_feedback.axis2_voltage = voltage
+ self.arm_feedback.axis2_current = current
+ elif motorId == 3:
+ self.arm_feedback.axis3_temp = temp
+ self.arm_feedback.axis3_voltage = voltage
+ self.arm_feedback.axis3_current = current
+ elif motorId == 4:
+ self.arm_feedback.axis0_temp = temp
+ self.arm_feedback.axis0_voltage = voltage
+ self.arm_feedback.axis0_current = current
@staticmethod
@@ -285,8 +373,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
- if self.ser.is_open:
- self.ser.close()
+ try:
+ if self.ser.is_open:
+ self.ser.close()
+ except Exception as e:
+ exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -302,6 +393,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
- signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
+ #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()
diff --git a/src/arm_pkg/arm_pkg/astra_arm.py b/src/arm_pkg/arm_pkg/astra_arm.py
new file mode 100644
index 0000000..44a1373
--- /dev/null
+++ b/src/arm_pkg/arm_pkg/astra_arm.py
@@ -0,0 +1,145 @@
+import rclpy
+from rclpy.node import Node
+import numpy as np
+import time, math, os
+from math import sin, cos, pi
+from ament_index_python.packages import get_package_share_directory
+from ikpy.chain import Chain
+from ikpy.link import OriginLink, URDFLink
+#import pygame as pyg
+from scipy.spatial.transform import Rotation as R
+from geometry_msgs.msg import Vector3
+
+
+# Misc
+degree = pi / 180.0
+
+
+def convert_angles(angles):
+ # Converts angles to the format used for the urdf (contains some dummy joints)
+ return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
+
+
+class Arm:
+ def __init__(self, urdf_name):
+ self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
+ # URDF file path
+ self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
+ # IKpy Chain
+ self.chain = Chain.from_urdf_file(self.urdf)
+
+
+ # Arrays for joint states
+ # Some links in the URDF are static (non-joints), these will remain zero for IK
+ # Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector
+ self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
+ self.current_angles = self.zero_angles
+ self.last_angles = self.zero_angles
+ self.ik_angles = self.zero_angles
+
+ self.current_position: list[float] = []
+ self.target_position = [0.0, 0.0, 0.0]
+ self.target_orientation: list = [] # Effector orientation desired at target position.
+ # Generally orientation for the effector is modified manually by the operator.
+
+ # Might not need, copied over from state_publisher.py in ik_test
+ #self.step = 0.03 # Max movement increment
+
+
+ def perform_ik(self, target_position, logger):
+ self.target_position = target_position
+ # Update the target orientation to the current orientation
+ self.update_orientation()
+ # print(f"[IK FOR] Target Position: {self.target_position}")
+ try:
+ # print(f"[TRY] Current Angles: {self.current_angles}")
+ # print(f"[TRY] Target Position: {self.target_position}")
+ # print(f"[TRY] Target Orientation: {self.target_orientation}")
+ self.ik_angles = self.chain.inverse_kinematics(
+ target_position=self.target_position,
+ target_orientation=self.target_orientation,
+ initial_position=self.current_angles,
+ orientation_mode="all"
+ )
+ # Check if the solution is within the tolerance
+ fk_matrix = self.chain.forward_kinematics(self.ik_angles)
+
+ fk_position = fk_matrix[:3, 3] # type: ignore
+
+ # print(f"[TRY] FK Position for Solution: {fk_position}")
+
+ error = np.linalg.norm(target_position - fk_position)
+ if error > self.ik_tolerance:
+ logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
+ return False
+ else:
+ logger.info(f"IK Solution Found. Error: {error}")
+ return True
+ except Exception as e:
+ logger.info(f"IK failed for exception: {e}")
+ return False
+
+ # # Given the FK_Matix for the arm's current pose, update the orientation array
+ # def update_orientation(self, fk_matrix):
+ # self.target_orientation = fk_matrix[:3, :3]
+ # return
+
+ # def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist):
+ # self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0]
+ # return
+
+ # Get current orientation of the end effector and update target_orientation
+ def update_orientation(self):
+
+ # FK matrix for arm's current pose
+ fk_matrix = self.chain.forward_kinematics(self.current_angles)
+
+ # Update target_orientation to the effector's current orientation
+ self.target_orientation = fk_matrix[:3, :3] # type: ignore
+
+ # Update current angles to those provided
+ # Resetting last_angles to the new angles
+ #
+ # Use: First call, or when angles are changed manually.
+ def reset_angles(self, angles):
+ # Update angles to the new angles
+ self.current_angles = convert_angles(angles)
+ self.last_angles = self.current_angles
+
+ # Update current angles to those provided
+ # Maintain previous angles in last_angles
+ #
+ # Use: Repeated calls during IK operation
+ def update_angles(self, angles):
+ # Update angles to the new angles
+ self.last_angles = self.current_angles
+ self.current_angles = convert_angles(angles)
+
+ # Get current X,Y,Z position of end effector
+ def get_position(self):
+ # FK matrix for arm's current pose
+ fk_matrix = self.chain.forward_kinematics(self.current_angles)
+
+ # Get the position of the end effector from the FK matrix
+ position = fk_matrix[:3, 3] # type: ignore
+
+ return position
+
+ # Get current X,Y,Z position of end effector
+ def get_position_vector(self):
+ # FK matrix for arm's current pose
+ fk_matrix = self.chain.forward_kinematics(self.current_angles)
+
+ # Get the position of the end effector from the FK matrix
+ position = fk_matrix[:3, 3] # type: ignore
+
+ # Return position as a NumPy array
+ return np.array(position)
+
+
+ def update_position(self):
+ # FK matrix for arm's current pose
+ fk_matrix = self.chain.forward_kinematics(self.current_angles)
+
+ # Get the position of the end effector from the FK matrix and update current pos
+ self.current_position = fk_matrix[:3, 3] # type: ignore
diff --git a/src/arm_pkg/setup.py b/src/arm_pkg/setup.py
index 8c8c3bd..089982a 100644
--- a/src/arm_pkg/setup.py
+++ b/src/arm_pkg/setup.py
@@ -1,4 +1,6 @@
from setuptools import find_packages, setup
+import os
+from glob import glob
package_name = 'arm_pkg'
@@ -10,6 +12,8 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
+ (os.path.join('share', package_name, 'launch'), glob('launch/*')),
+ (os.path.join('share', package_name, 'urdf'), glob('urdf/*')),
],
install_requires=['setuptools'],
zip_safe=True,
diff --git a/src/arm_pkg/urdf/arm11.urdf b/src/arm_pkg/urdf/arm11.urdf
new file mode 100644
index 0000000..2fb7036
--- /dev/null
+++ b/src/arm_pkg/urdf/arm11.urdf
@@ -0,0 +1,239 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/arm_pkg/urdf/arm12.urdf b/src/arm_pkg/urdf/arm12.urdf
new file mode 100644
index 0000000..b63d872
--- /dev/null
+++ b/src/arm_pkg/urdf/arm12.urdf
@@ -0,0 +1,307 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/bio_pkg/bio_pkg/bio_node.py b/src/bio_pkg/bio_pkg/bio_node.py
index ad8f40b..94b4fba 100644
--- a/src/bio_pkg/bio_pkg/bio_node.py
+++ b/src/bio_pkg/bio_pkg/bio_node.py
@@ -3,6 +3,7 @@ from rclpy.node import Node
import serial
import sys
import threading
+import os
import glob
import time
import atexit
@@ -202,8 +203,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
- if self.ser.is_open:
- self.ser.close()
+ try:
+ if self.ser.is_open:
+ self.ser.close()
+ except Exception as e:
+ exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -219,6 +223,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
- signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
+ #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()
diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py
index 1768293..6772e28 100644
--- a/src/core_pkg/core_pkg/core_node.py
+++ b/src/core_pkg/core_pkg/core_node.py
@@ -1,5 +1,6 @@
import rclpy
from rclpy.node import Node
+from rclpy import qos
from std_srvs.srv import Empty
import signal
@@ -7,6 +8,7 @@ import time
import atexit
import serial
+import os
import sys
import threading
import glob
@@ -18,6 +20,17 @@ from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None
thread = None
+# control_qos = qos.QoSProfile(
+# history=qos.QoSHistoryPolicy.KEEP_LAST,
+# depth=1,
+# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
+# durability=qos.QoSDurabilityPolicy.VOLATILE,
+# deadline=1000,
+# lifespan=500,
+# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
+# liveliness_lease_duration=5000
+# )
+
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
@@ -189,7 +202,7 @@ class SerialRelay(Node):
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'core':
- self.get_logger().info(f"[Core to MCU] {msg}")
+ self.get_logger().info(f"[Core to MCU] {msg.data}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String):
@@ -263,8 +276,11 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up before terminating...")
- if self.ser.is_open:
- self.ser.close()
+ try:
+ if self.ser.is_open:
+ self.ser.close()
+ except Exception as e:
+ exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -283,7 +299,7 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
- signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
+ #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()