feat: (arm) begin adding VicCAN topics

This commit is contained in:
David
2025-11-24 10:09:10 -06:00
parent ddfceb1b42
commit 8250b91c57

View File

@@ -11,6 +11,7 @@ import signal
from std_msgs.msg import String from std_msgs.msg import String
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN
import math import math
# control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
@@ -27,6 +28,14 @@ import math
serial_pub = None serial_pub = None
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
53: 4,
54: 4,
55: 4
}
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
@@ -42,6 +51,13 @@ class SerialRelay(Node):
# Anchor topics # Anchor topics
if self.launch_mode == "anchor": if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 String, "/anchor/arm/feedback", self.anchor_feedback, 10
) )
@@ -255,6 +271,21 @@ class SerialRelay(Node):
else: else:
return return
def relay_fromvic(self, msg: VicCAN):
# Assume that the message is coming from Core
# skill diff if not
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
# match msg.command_id:
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)