diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py index 5adafbc..e03fcd1 100644 --- a/src/anchor_pkg/anchor_pkg/connector.py +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from time import monotonic +from typing import TYPE_CHECKING from astra_msgs.msg import VicCAN from std_msgs.msg import String from rclpy.clock import Clock @@ -101,7 +102,7 @@ class SerialConnector(Connector): mcu_name: str | None = None # Serial buffering - self._serial_buffer = "" + self._serial_buffer: bytes = b"" self._last_read_time = monotonic() if serial_override: @@ -212,35 +213,40 @@ class SerialConnector(Connector): Returns: str: A hopefully-complete string read from the MCU via the serial interface. """ + if TYPE_CHECKING: + assert type(self.serial_interface) == serial.Serial + + # Warn on buffer timeout, as the only scenarios that would trigger this are + # a microcontroller output that isn't newline-terminated (bad), or the MCU is + # hanging (also bad). if ( self._serial_buffer and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT ): - # Warn on buffer timeout, as the only scenarios that would trigger this are - # a microcontroller output that isn't newline-terminated (bad), or the MCU - # hanging (also bad). self.logger.warn( f"Serial buffer timeout, last received '{self._serial_buffer}'." ) result = self._serial_buffer - self._serial_buffer = "" + self._serial_buffer = b"" self._last_read_time = monotonic() - return result + return str(result, "utf8").strip() # No try-except here so caller catches it instead. - raw = str(self.serial_interface.readline(), "utf8") + raw = self.serial_interface.readline() + # Empty or whitespace-only string if not raw or not raw.strip(): return None - if not (raw.endswith("\n") or raw.endswith("\r")): # unfinished string + # Add to buffer or send finished buffer + if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string self._serial_buffer += raw self._last_read_time = monotonic() return None else: result = self._serial_buffer + raw - self._serial_buffer = "" - return result.strip() + self._serial_buffer = b"" + return str(result, "utf8").strip() def read(self) -> tuple[VicCAN | None, str | None]: try: