From 4e1e0e29dd5b410154dd17fde48789908b3a1f56 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 12 Aug 2025 08:49:12 -0500 Subject: [PATCH] style: get ready for main --- src/arm_pkg/arm_pkg/arm_node.py | 317 ++++++++++++++--------------- src/core_pkg/core_pkg/core_node.py | 22 +- 2 files changed, 162 insertions(+), 177 deletions(-) diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index 9a7c6c0..6b838d2 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -27,16 +27,16 @@ from ament_index_python.packages import get_package_share_directory from . import astra_arm -control_qos = qos.QoSProfile( - history=qos.QoSHistoryPolicy.KEEP_LAST, - depth=1, - reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, - durability=qos.QoSDurabilityPolicy.VOLATILE, - deadline=1000, - lifespan=500, - liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, - liveliness_lease_duration=5000 -) +# control_qos = qos.QoSProfile( +# history=qos.QoSHistoryPolicy.KEEP_LAST, +# depth=1, +# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, +# durability=qos.QoSDurabilityPolicy.VOLATILE, +# deadline=1000, +# lifespan=500, +# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, +# liveliness_lease_duration=5000 +# ) serial_pub = None thread = None @@ -59,7 +59,7 @@ class SerialRelay(Node): self.feedback_timer = self.create_timer(0.25, self.publish_feedback) # Create subscribers - self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, qos_profile=control_qos) + self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10) @@ -152,161 +152,6 @@ class SerialRelay(Node): self.ser.close() pass - - def updateAngleFeedback(self, msg: str): - # Angle feedbacks, - # split the msg.data by commas - parts = msg.split(",") - - if len(parts) >= 7: - # Extract the angles from the string - angles_in = parts[3:7] - # Convert the angles to floats divide by 10.0 - angles = [float(angle) / 10.0 for angle in angles_in] - # angles[0] = 0.0 #override axis0 to zero - # - # - #THIS NEEDS TO BE REMOVED LATER - #PLACEHOLDER FOR WRIST VALUE - # - # - angles.append(0.0)#placeholder for wrist_continuous - angles.append(0.0)#placeholder for wrist - # - # - # # Update the arm's current angles - self.arm.update_angles(angles) - self.arm_feedback.axis0_angle = angles[0] - self.arm_feedback.axis1_angle = angles[1] - self.arm_feedback.axis2_angle = angles[2] - self.arm_feedback.axis3_angle = angles[3] - # self.get_logger().info(f"Angles: {angles}") - # #debug publish angles - # tempMsg = String() - # tempMsg.data = "Angles: " + str(angles) - # #self.debug_pub.publish(tempMsg) - else: - self.get_logger().info("Invalid angle feedback input format") - - - def updateBusVoltage(self, msg: str): - # Bus Voltage feedbacks - parts = msg.split(",") - if len(parts) >= 7: - # Extract the voltage from the string - voltages_in = parts[3:7] - # Convert the voltages to floats - self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0 - self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0 - self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0 - self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0 - else: - self.get_logger().info("Invalid voltage feedback input format") - - def updateMotorFeedback(self, msg: str): - parts = str(msg.strip()).split(",") - motorId = round(float(parts[3])) - temp = float(parts[4]) / 10.0 - voltage = float(parts[5]) / 10.0 - current = float(parts[6]) / 10.0 - if motorId == 1: - self.arm_feedback.axis1_temp = temp - self.arm_feedback.axis1_voltage = voltage - self.arm_feedback.axis1_current = current - elif motorId == 2: - self.arm_feedback.axis2_temp = temp - self.arm_feedback.axis2_voltage = voltage - self.arm_feedback.axis2_current = current - elif motorId == 3: - self.arm_feedback.axis3_temp = temp - self.arm_feedback.axis3_voltage = voltage - self.arm_feedback.axis3_current = current - elif motorId == 4: - self.arm_feedback.axis0_temp = temp - self.arm_feedback.axis0_voltage = voltage - self.arm_feedback.axis0_current = current - - def send_manual(self, msg: ArmManual): - axis0 = msg.axis0 - axis1 = -1 * msg.axis1 - axis2 = msg.axis2 - axis3 = msg.axis3 - - #Send controls for arm - command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n" - command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" - - #Send controls for end effector - command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n" - - - command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n" - - - command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n" - - - command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" - - command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n" - - self.send_cmd(command) - - - - #print(f"[Wrote] {command}", end="") - - #Not yet finished, needs embedded implementation for new commands - # ef_roll = msg.effector_roll - # ef_yaw = msg.effector_yaw - # gripper = msg.gripper - # actuator = msg.linear_actuator - # laser = msg.laser - # #Send controls for digit - - # command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n" - - return - - def send_cmd(self, msg: str): - if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay - output = String() - output.data = msg - self.anchor_pub.publish(output) - elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly - self.get_logger().info(f"[Arm to MCU] {msg.data}") - self.ser.write(bytes(msg, "utf8")) - - def anchor_feedback(self, msg: String): - output = msg.data - if output.startswith("can_relay_fromvic,arm,55"): - #pass - self.updateAngleFeedback(output) - elif output.startswith("can_relay_fromvic,arm,54"): - #pass - self.updateBusVoltage(output) - elif output.startswith("can_relay_fromvic,arm,53"): - self.updateMotorFeedback(output) - elif output.startswith("can_relay_fromvic,digit,54"): - parts = msg.data.split(",") - if len(parts) >= 7: - # Extract the voltage from the string - voltages_in = parts[3:7] - # Convert the voltages to floats - self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0 - self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0 - self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0 - elif output.startswith("can_relay_fromvic,digit,55"): - parts = msg.data.split(",") - if len(parts) >= 4: - self.digit_feedback.wrist_angle = float(parts[3]) - else: - return - - def publish_feedback(self): - self.socket_pub.publish(self.arm_feedback) - self.digit_pub.publish(self.digit_feedback) - def send_ik(self, msg): # Convert Vector3 to a NumPy array input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array @@ -381,6 +226,146 @@ class SerialRelay(Node): #self.debug_pub.publish(tempMsg) + def send_manual(self, msg: ArmManual): + axis0 = msg.axis0 + axis1 = -1 * msg.axis1 + axis2 = msg.axis2 + axis3 = msg.axis3 + + #Send controls for arm + command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n" + command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" + + #Send controls for end effector + command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n" + + + command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n" + + + command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n" + + + command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" + + command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n" + + self.send_cmd(command) + + return + + def send_cmd(self, msg: str): + if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay + output = String() + output.data = msg + self.anchor_pub.publish(output) + elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly + self.get_logger().info(f"[Arm to MCU] {msg.data}") + self.ser.write(bytes(msg, "utf8")) + + def anchor_feedback(self, msg: String): + output = msg.data + if output.startswith("can_relay_fromvic,arm,55"): + #pass + self.updateAngleFeedback(output) + elif output.startswith("can_relay_fromvic,arm,54"): + #pass + self.updateBusVoltage(output) + elif output.startswith("can_relay_fromvic,arm,53"): + self.updateMotorFeedback(output) + elif output.startswith("can_relay_fromvic,digit,54"): + parts = msg.data.split(",") + if len(parts) >= 7: + # Extract the voltage from the string + voltages_in = parts[3:7] + # Convert the voltages to floats + self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0 + self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0 + self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0 + elif output.startswith("can_relay_fromvic,digit,55"): + parts = msg.data.split(",") + if len(parts) >= 4: + self.digit_feedback.wrist_angle = float(parts[3]) + else: + return + + def publish_feedback(self): + self.socket_pub.publish(self.arm_feedback) + self.digit_pub.publish(self.digit_feedback) + + def updateAngleFeedback(self, msg: str): + # Angle feedbacks, + # split the msg.data by commas + parts = msg.split(",") + + if len(parts) >= 7: + # Extract the angles from the string + angles_in = parts[3:7] + # Convert the angles to floats divide by 10.0 + angles = [float(angle) / 10.0 for angle in angles_in] + # angles[0] = 0.0 #override axis0 to zero + # + # + #THIS NEEDS TO BE REMOVED LATER + #PLACEHOLDER FOR WRIST VALUE + # + # + angles.append(0.0)#placeholder for wrist_continuous + angles.append(0.0)#placeholder for wrist + # + # + # # Update the arm's current angles + self.arm.update_angles(angles) + self.arm_feedback.axis0_angle = angles[0] + self.arm_feedback.axis1_angle = angles[1] + self.arm_feedback.axis2_angle = angles[2] + self.arm_feedback.axis3_angle = angles[3] + # self.get_logger().info(f"Angles: {angles}") + # #debug publish angles + # tempMsg = String() + # tempMsg.data = "Angles: " + str(angles) + # #self.debug_pub.publish(tempMsg) + else: + self.get_logger().info("Invalid angle feedback input format") + + def updateBusVoltage(self, msg: str): + # Bus Voltage feedbacks + parts = msg.split(",") + if len(parts) >= 7: + # Extract the voltage from the string + voltages_in = parts[3:7] + # Convert the voltages to floats + self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0 + self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0 + self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0 + self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0 + else: + self.get_logger().info("Invalid voltage feedback input format") + + def updateMotorFeedback(self, msg: str): + parts = str(msg.strip()).split(",") + motorId = round(float(parts[3])) + temp = float(parts[4]) / 10.0 + voltage = float(parts[5]) / 10.0 + current = float(parts[6]) / 10.0 + if motorId == 1: + self.arm_feedback.axis1_temp = temp + self.arm_feedback.axis1_voltage = voltage + self.arm_feedback.axis1_current = current + elif motorId == 2: + self.arm_feedback.axis2_temp = temp + self.arm_feedback.axis2_voltage = voltage + self.arm_feedback.axis2_current = current + elif motorId == 3: + self.arm_feedback.axis3_temp = temp + self.arm_feedback.axis3_voltage = voltage + self.arm_feedback.axis3_current = current + elif motorId == 4: + self.arm_feedback.axis0_temp = temp + self.arm_feedback.axis0_voltage = voltage + self.arm_feedback.axis0_current = current + + @staticmethod def list_serial_ports(): return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 9004705..6772e28 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -20,16 +20,16 @@ from ros2_interfaces_pkg.msg import CoreControl serial_pub = None thread = None -control_qos = qos.QoSProfile( - history=qos.QoSHistoryPolicy.KEEP_LAST, - depth=1, - reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, - durability=qos.QoSDurabilityPolicy.VOLATILE, - deadline=1000, - lifespan=500, - liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, - liveliness_lease_duration=5000 -) +# control_qos = qos.QoSProfile( +# history=qos.QoSHistoryPolicy.KEEP_LAST, +# depth=1, +# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, +# durability=qos.QoSDurabilityPolicy.VOLATILE, +# deadline=1000, +# lifespan=500, +# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, +# liveliness_lease_duration=5000 +# ) class SerialRelay(Node): def __init__(self): @@ -45,7 +45,7 @@ class SerialRelay(Node): self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) # Create a subscriber - self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, qos_profile=control_qos) + self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) # Create a publisher for telemetry self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)