mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
feat: add VicCAN topics to Anchor
This commit is contained in:
@@ -13,17 +13,27 @@ import threading
|
||||
import glob
|
||||
|
||||
from std_msgs.msg import String
|
||||
from ros2_interfaces_pkg.msg import CoreFeedback
|
||||
from ros2_interfaces_pkg.msg import CoreControl
|
||||
from ros2_interfaces_pkg.msg import VicCAN
|
||||
|
||||
serial_pub = None
|
||||
thread = None
|
||||
|
||||
|
||||
class SerialRelay(Node):
|
||||
def __init__(self):
|
||||
# Initalize node with name
|
||||
super().__init__("anchor_node")#previously 'serial_publisher'
|
||||
|
||||
# New pub/sub with VicCAN
|
||||
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_mock_mcu', self.relay_mock_fromvic, 20)
|
||||
self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20)
|
||||
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20)
|
||||
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20)
|
||||
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.relay_tovic, 20)
|
||||
|
||||
self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20)
|
||||
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/debug', self.send_cmd, 20)
|
||||
|
||||
|
||||
# Create publishers
|
||||
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
|
||||
@@ -31,7 +41,7 @@ class SerialRelay(Node):
|
||||
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
|
||||
|
||||
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
|
||||
|
||||
|
||||
# Create a subscriber
|
||||
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.send_cmd, 10)
|
||||
|
||||
@@ -39,33 +49,32 @@ class SerialRelay(Node):
|
||||
self.port = None
|
||||
ports = SerialRelay.list_serial_ports()
|
||||
for i in range(4):
|
||||
if self.port is not None:
|
||||
break
|
||||
for port in ports:
|
||||
try:
|
||||
# connect and send a ping command
|
||||
ser = serial.Serial(port, 115200, timeout=1)
|
||||
#(f"Checking port {port}...")
|
||||
ser.write(b"ping\n")
|
||||
response = ser.read_until("\n")
|
||||
ser.write(b"can_relay_mode,on\n")
|
||||
response = ser.read_until(bytes("\n", "utf8"))
|
||||
|
||||
# if pong is in response, then we are talking with the MCU
|
||||
if b"pong" in response:
|
||||
self.port = port
|
||||
self.get_logger().info(f"Found MCU at {self.port}!")
|
||||
self.get_logger().info(f"Enabling Relay Mode")
|
||||
ser.write(b"can_relay_mode,on\n")
|
||||
break
|
||||
except:
|
||||
pass
|
||||
if self.port is not None:
|
||||
break
|
||||
|
||||
|
||||
if self.port is None:
|
||||
self.get_logger().info("Unable to find MCU...")
|
||||
time.sleep(1)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
self.ser = serial.Serial(self.port, 115200)
|
||||
self.get_logger().info(f"Enabling Relay Mode")
|
||||
self.ser.write(b"can_relay_mode,on\n")
|
||||
atexit.register(self.cleanup)
|
||||
|
||||
|
||||
@@ -86,6 +95,7 @@ class SerialRelay(Node):
|
||||
output = str(self.ser.readline(), "utf8")
|
||||
|
||||
if output:
|
||||
self.relay_fromvic(output)
|
||||
# All output over debug temporarily
|
||||
#self.get_logger().info(f"[MCU] {output}")
|
||||
msg = String()
|
||||
@@ -120,7 +130,41 @@ class SerialRelay(Node):
|
||||
# self.ser.close()
|
||||
# exit(1)
|
||||
|
||||
def send_cmd(self, msg):
|
||||
|
||||
def relay_mock_fromvic(self, msg: String):
|
||||
#self.get_logger().info(f"Got command from mock MCU: {msg}")
|
||||
self.relay_fromvic(msg.data)
|
||||
|
||||
|
||||
def relay_tovic(self, msg: VicCAN):
|
||||
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
|
||||
for num in msg.data:
|
||||
output += f",{num}"
|
||||
output += "\n"
|
||||
#self.get_logger().info(f"Relaying to MCU: {output}")
|
||||
self.ser.write(bytes(output, "utf8"))
|
||||
|
||||
def relay_fromvic(self, msg: str):
|
||||
self.fromvic_debug_pub_.publish(String(data=msg))
|
||||
parts = msg.strip().split(",")
|
||||
if len(parts) < 3 or parts[0] != "can_relay_fromvic":
|
||||
return
|
||||
output = VicCAN()
|
||||
output.mcu_name = parts[1]
|
||||
output.command_id = int(parts[2])
|
||||
if len(parts) > 3:
|
||||
output.data = [float(x) for x in parts[3:]]
|
||||
output.header.stamp = self.get_clock().now().to_msg()
|
||||
# self.get_logger().info(f"Relaying from MCU: {output}")
|
||||
if output.mcu_name == "core":
|
||||
self.fromvic_core_pub_.publish(output)
|
||||
elif output.mcu_name == "arm" or output.mcu_name == "digit":
|
||||
self.fromvic_arm_pub_.publish(output)
|
||||
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
|
||||
self.fromvic_bio_pub_.publish(output)
|
||||
|
||||
|
||||
def send_cmd(self, msg: String):
|
||||
message = msg.data
|
||||
#self.get_logger().info(f"Sending command to MCU: {msg}")
|
||||
self.ser.write(bytes(message, "utf8"))
|
||||
@@ -128,7 +172,7 @@ class SerialRelay(Node):
|
||||
@staticmethod
|
||||
def list_serial_ports():
|
||||
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
print("Cleaning up before terminating...")
|
||||
if self.ser.is_open:
|
||||
|
||||
Reference in New Issue
Block a user