Merge pull request #8 from SHC-ASTRA/add-arm-feedback

Add arm feedback
This commit is contained in:
Tristan McGinnis
2025-05-21 14:15:35 -05:00
committed by GitHub
3 changed files with 104 additions and 13 deletions

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
from launch.substitutions import LaunchConfiguration from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node from launch_ros.actions import Node
@@ -24,7 +24,8 @@ def launch_setup(context, *args, **kwargs):
executable='arm', # change as needed executable='arm', # change as needed
name='arm', name='arm',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
nodes.append( nodes.append(
@@ -33,7 +34,8 @@ def launch_setup(context, *args, **kwargs):
executable='core', # change as needed executable='core', # change as needed
name='core', name='core',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
nodes.append( nodes.append(
@@ -42,7 +44,8 @@ def launch_setup(context, *args, **kwargs):
executable='bio', # change as needed executable='bio', # change as needed
name='bio', name='bio',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
nodes.append( nodes.append(
@@ -51,7 +54,8 @@ def launch_setup(context, *args, **kwargs):
executable='anchor', # change as needed executable='anchor', # change as needed
name='anchor', name='anchor',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
elif mode in ['arm', 'core', 'bio']: elif mode in ['arm', 'core', 'bio']:
@@ -63,7 +67,8 @@ def launch_setup(context, *args, **kwargs):
executable='arm', executable='arm',
name='arm', name='arm',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
elif mode == 'core': elif mode == 'core':
@@ -73,7 +78,8 @@ def launch_setup(context, *args, **kwargs):
executable='core', executable='core',
name='core', name='core',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
elif mode == 'bio': elif mode == 'bio':
@@ -83,7 +89,8 @@ def launch_setup(context, *args, **kwargs):
executable='bio', executable='bio',
name='bio', name='bio',
output='both', output='both',
parameters=[{'launch_mode': mode}] parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
) )
) )
else: else:

View File

@@ -85,7 +85,7 @@ class SerialRelay(Node):
if output: if output:
# All output over debug temporarily # All output over debug temporarily
self.get_logger().info(f"[MCU] {output}") #self.get_logger().info(f"[MCU] {output}")
msg = String() msg = String()
msg.data = output msg.data = output
if output.startswith("can_relay_fromvic,core"): if output.startswith("can_relay_fromvic,core"):

View File

@@ -28,6 +28,8 @@ class SerialRelay(Node):
# Create publishers # Create publishers
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
# Create subscribers # Create subscribers
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
@@ -37,6 +39,7 @@ class SerialRelay(Node):
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.arm_feedback = SocketFeedback()
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == 'arm': if self.launch_mode == 'arm':
@@ -122,7 +125,7 @@ class SerialRelay(Node):
def send_manual(self, msg): def send_manual(self, msg):
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
axis3 = msg.axis3 axis3 = msg.axis3
@@ -169,9 +172,90 @@ class SerialRelay(Node):
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg): def anchor_feedback(self, msg):
pass output = msg.data
#self.get_logger().info(f"[Arm Anchor] {msg.data}") if output.startswith("can_relay_fromvic,arm,55"):
#self.send_cmd(msg.data) #pass
self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"):
#pass
self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"):
#pass
self.updateMotorFeedback(output)
else:
return
def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback)
def updateAngleFeedback(self, msg):
# Angle feedbacks,
#split the msg.data by commas
parts = msg.split(",")
if len(parts) >= 7:
# Extract the angles from the string
angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in]
#angles[0] = 0.0 #override axis0 to zero
#
#
#THIS NEEDS TO BE REMOVED LATER
#PLACEHOLDER FOR WRIST VALUE
#
#
angles.append(0.0)#placeholder for wrist_continuous
angles.append(0.0)#placeholder for wrist
#
#
# # Update the arm's current angles
#self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3]
# self.get_logger().info(f"Angles: {angles}")
# #debug publish angles
# tempMsg = String()
# tempMsg.data = "Angles: " + str(angles)
# #self.debug_pub.publish(tempMsg)
else:
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg):
# Bus Voltage feedbacks
parts = msg.split(",")
if len(parts) >= 7:
# Extract the voltage from the string
voltages_in = parts[3:7]
# Convert the voltages to floats
self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0
self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0
self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else:
self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg):
# Motor voltage/current/temperature feedback
return
# parts = msg.split(",")
# if len(parts) >= 7:
# # Extract the voltage/current/temperature from the string
# values_in = parts[3:7]
# # Convert the voltages to floats
# for i in range(4):
# #update arm_feedback's axisX_temp for each axis0_temp, axis1_temp, etc...
# pass
# # self.arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0)
# else:
# self.get_logger().info("Invalid motor feedback input format")
@staticmethod @staticmethod