refactor(anchor): buffer bytes instead of string

Resolves Riley's comments, makes it so UTF-8 characters over 1 byte aren't destroyed
This commit is contained in:
David
2026-04-18 17:46:59 -05:00
parent c166668415
commit 5239668e8e

View File

@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from time import monotonic from time import monotonic
from typing import TYPE_CHECKING
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
from std_msgs.msg import String from std_msgs.msg import String
from rclpy.clock import Clock from rclpy.clock import Clock
@@ -101,7 +102,7 @@ class SerialConnector(Connector):
mcu_name: str | None = None mcu_name: str | None = None
# Serial buffering # Serial buffering
self._serial_buffer = "" self._serial_buffer: bytes = b""
self._last_read_time = monotonic() self._last_read_time = monotonic()
if serial_override: if serial_override:
@@ -212,35 +213,40 @@ class SerialConnector(Connector):
Returns: Returns:
str: A hopefully-complete string read from the MCU via the serial interface. str: A hopefully-complete string read from the MCU via the serial interface.
""" """
if TYPE_CHECKING:
assert type(self.serial_interface) == serial.Serial
# Warn on buffer timeout, as the only scenarios that would trigger this are
# a microcontroller output that isn't newline-terminated (bad), or the MCU is
# hanging (also bad).
if ( if (
self._serial_buffer self._serial_buffer
and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT
): ):
# Warn on buffer timeout, as the only scenarios that would trigger this are
# a microcontroller output that isn't newline-terminated (bad), or the MCU
# hanging (also bad).
self.logger.warn( self.logger.warn(
f"Serial buffer timeout, last received '{self._serial_buffer}'." f"Serial buffer timeout, last received '{self._serial_buffer}'."
) )
result = self._serial_buffer result = self._serial_buffer
self._serial_buffer = "" self._serial_buffer = b""
self._last_read_time = monotonic() self._last_read_time = monotonic()
return result return str(result, "utf8").strip()
# No try-except here so caller catches it instead. # No try-except here so caller catches it instead.
raw = str(self.serial_interface.readline(), "utf8") raw = self.serial_interface.readline()
# Empty or whitespace-only string
if not raw or not raw.strip(): if not raw or not raw.strip():
return None return None
if not (raw.endswith("\n") or raw.endswith("\r")): # unfinished string # Add to buffer or send finished buffer
if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string
self._serial_buffer += raw self._serial_buffer += raw
self._last_read_time = monotonic() self._last_read_time = monotonic()
return None return None
else: else:
result = self._serial_buffer + raw result = self._serial_buffer + raw
self._serial_buffer = "" self._serial_buffer = b""
return result.strip() return str(result, "utf8").strip()
def read(self) -> tuple[VicCAN | None, str | None]: def read(self) -> tuple[VicCAN | None, str | None]:
try: try: