mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
style: minor edits based on riley's comments
This commit is contained in:
@@ -49,9 +49,9 @@ class SerialRelay(Node):
|
||||
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20)
|
||||
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20)
|
||||
|
||||
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.relay_mock_fromvic, 20)
|
||||
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.relay_tovic, 20)
|
||||
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.send_cmd, 20)
|
||||
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20)
|
||||
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20)
|
||||
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20)
|
||||
|
||||
|
||||
# Create publishers
|
||||
@@ -62,7 +62,7 @@ class SerialRelay(Node):
|
||||
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
|
||||
|
||||
# Create a subscriber
|
||||
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.send_cmd, 10)
|
||||
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10)
|
||||
|
||||
# Loop through all serial devices on the computer to check for the MCU
|
||||
self.port = None
|
||||
@@ -111,6 +111,7 @@ class SerialRelay(Node):
|
||||
sys.exit(0)
|
||||
|
||||
def read_MCU(self):
|
||||
""" Check the USB serial port for new data from the MCU, and publish string to appropriate topics """
|
||||
try:
|
||||
output = str(self.ser.readline(), "utf8")
|
||||
|
||||
@@ -151,12 +152,14 @@ class SerialRelay(Node):
|
||||
# exit(1)
|
||||
|
||||
|
||||
def relay_mock_fromvic(self, msg: String):
|
||||
def on_mock_fromvic(self, msg: String):
|
||||
""" For testing without an actual MCU, publish strings here as if they came from an MCU """
|
||||
# self.get_logger().info(f"Got command from mock MCU: {msg}")
|
||||
self.relay_fromvic(msg.data)
|
||||
|
||||
|
||||
def relay_tovic(self, msg: VicCAN):
|
||||
def on_relay_tovic_viccan(self, msg: VicCAN):
|
||||
""" Relay a VicCAN message to the MCU """
|
||||
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
|
||||
for num in msg.data:
|
||||
output += f",{round(num, 7)}" # limit to 7 decimal places
|
||||
@@ -165,6 +168,7 @@ class SerialRelay(Node):
|
||||
self.ser.write(bytes(output, "utf8"))
|
||||
|
||||
def relay_fromvic(self, msg: str):
|
||||
""" Relay a string message from the MCU to the appropriate VicCAN topic """
|
||||
self.fromvic_debug_pub_.publish(String(data=msg))
|
||||
parts = msg.strip().split(",")
|
||||
if len(parts) < 3 or parts[0] != "can_relay_fromvic":
|
||||
@@ -186,7 +190,8 @@ class SerialRelay(Node):
|
||||
self.fromvic_bio_pub_.publish(output)
|
||||
|
||||
|
||||
def send_cmd(self, msg: String):
|
||||
def on_relay_tovic_string(self, msg: String):
|
||||
""" Relay a raw string message to the MCU for debugging """
|
||||
message = msg.data
|
||||
#self.get_logger().info(f"Sending command to MCU: {msg}")
|
||||
self.ser.write(bytes(message, "utf8"))
|
||||
|
||||
@@ -50,7 +50,6 @@ class Headless(Node):
|
||||
pygame.joystick.init()
|
||||
|
||||
# Wait for a gamepad to be connected
|
||||
self.gamepad = None
|
||||
print("Waiting for gamepad connection...")
|
||||
while pygame.joystick.get_count() == 0:
|
||||
# Process any pygame events to keep it responsive
|
||||
@@ -99,6 +98,7 @@ class Headless(Node):
|
||||
sys.exit(0)
|
||||
|
||||
def send_controls(self):
|
||||
""" Read the gamepad state and publish control messages """
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
pygame.quit()
|
||||
@@ -116,10 +116,6 @@ class Headless(Node):
|
||||
pygame.quit()
|
||||
sys.exit(0)
|
||||
|
||||
# Make intellisense STFU
|
||||
if self.gamepad == None:
|
||||
return
|
||||
|
||||
|
||||
new_ctrl_mode = self.ctrl_mode # if "" then inequality will always be true
|
||||
|
||||
@@ -274,6 +270,7 @@ class Headless(Node):
|
||||
|
||||
|
||||
def deadzone(value: float, threshold=0.05) -> float:
|
||||
""" Apply a deadzone to a joystick input so the motors don't sound angry """
|
||||
if abs(value) < threshold:
|
||||
return 0
|
||||
return value
|
||||
|
||||
Reference in New Issue
Block a user