feat: (arm) add VicCAN feedback section for Digit alongside Socket

This commit is contained in:
David
2025-12-22 14:17:06 -06:00
parent 292873a50a
commit 5a4e9c8e53

View File

@@ -30,13 +30,18 @@ thread = None
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list # Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = { viccan_socket_msg_len_dict = {
53: 4, 53: 4,
54: 4, 54: 4,
55: 4, 55: 4,
58: 4, 58: 4,
59: 4 59: 4
} }
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2
}
class SerialRelay(Node): class SerialRelay(Node):
@@ -90,6 +95,9 @@ class SerialRelay(Node):
self.saved_joint_state.position = [0.0] * len( self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name self.saved_joint_state.name
) # Initialize with zeros ) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
# Old # Old
@@ -251,10 +259,8 @@ class SerialRelay(Node):
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
# pass
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"): elif output.startswith("can_relay_fromvic,arm,54"):
# pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
@@ -272,22 +278,23 @@ class SerialRelay(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.saved_joint_state.position[4] = math.radians(
float(parts[4])
) # Wrist roll
self.saved_joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return
def relay_fromvic(self, msg: VicCAN): def relay_fromvic(self, msg: VicCAN):
# Code for socket and digit are broken out for cleaner code
if msg.mcu_name == "arm":
self.process_fromvic_arm(msg)
elif msg.mcu_name == "digit":
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm": if msg.mcu_name != "arm":
return # ignore digit for now return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_msg_len_dict: if msg.command_id in viccan_socket_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id] expected_len = viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -315,6 +322,24 @@ class SerialRelay(Node):
self.saved_joint_state.header.stamp = msg.header.stamp self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(msg.data[0]) # Wrist roll
self.saved_joint_state.position[5] = math.radians(msg.data[1]) # Wrist yaw
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)