From c1666684154d29f94e298f32425f37b87afc4360 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 18 Apr 2026 02:52:58 -0500 Subject: [PATCH 1/2] feat(anchor): implement serial read buffering for non-blocking reads --- src/anchor_pkg/anchor_pkg/connector.py | 64 +++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py index ed51987..5adafbc 100644 --- a/src/anchor_pkg/anchor_pkg/connector.py +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from time import monotonic from astra_msgs.msg import VicCAN from std_msgs.msg import String from rclpy.clock import Clock @@ -20,8 +21,11 @@ KNOWN_USBS = [ (0x10C4, 0xEA60), # DOIT ESP32 Devkit V1 (0x1A86, 0x55D3), # ESP32 S3 Development Board ] + BAUD_RATE = 115200 +SERIAL_READ_TIMEOUT = 0.5 # seconds + MCU_IDS = [ "broadcast", "core", @@ -96,6 +100,10 @@ class SerialConnector(Connector): ports = self._find_ports() mcu_name: str | None = None + # Serial buffering + self._serial_buffer = "" + self._last_read_time = monotonic() + if serial_override: logger.warn( f"using serial_override: `{serial_override}`! this will bypass several checks." @@ -123,7 +131,7 @@ class SerialConnector(Connector): self.mcu_name = mcu_name # if we fail at this point, it should crash because we've already tested the port - self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1) + self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=0) def _find_ports(self) -> list[str]: """ @@ -182,9 +190,61 @@ class SerialConnector(Connector): return None + def _try_readline(self) -> str | None: + """Attempts to read a full string from the MCU without blocking. + + When pyserial is used with 'timeout=0', reads are performed non-blocking. + When used with interface.readline(), this breaks the assumption that a returned + string will be a completed attempt by the MCU to send a string; it may be + cut off between the start of the string and the newline, removing information + and rendering the string(s) useless; thus, to get around the downside of readline() + not waiting for a newline while still not blocking, this function manually + implements a serial input buffer and newline timeout. + + If readline() returns a non-empty string, send it if it ends with a newline + (readline() will not read past any newline); otherwise, save the read string. + This buffered string should be pre-pended to the next readline() result. + + If readline() does not receive a non-empty string after the last non-newline- + terminated readline() result within the manual timeout, send the contents of the + buffer as if it ended with a newline, and clear the buffer. + + Returns: + str: A hopefully-complete string read from the MCU via the serial interface. + """ + if ( + self._serial_buffer + and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT + ): + # Warn on buffer timeout, as the only scenarios that would trigger this are + # a microcontroller output that isn't newline-terminated (bad), or the MCU + # hanging (also bad). + self.logger.warn( + f"Serial buffer timeout, last received '{self._serial_buffer}'." + ) + result = self._serial_buffer + self._serial_buffer = "" + self._last_read_time = monotonic() + return result + + # No try-except here so caller catches it instead. + raw = str(self.serial_interface.readline(), "utf8") + + if not raw or not raw.strip(): + return None + + if not (raw.endswith("\n") or raw.endswith("\r")): # unfinished string + self._serial_buffer += raw + self._last_read_time = monotonic() + return None + else: + result = self._serial_buffer + raw + self._serial_buffer = "" + return result.strip() + def read(self) -> tuple[VicCAN | None, str | None]: try: - raw = str(self.serial_interface.readline(), "utf8") + raw = self._try_readline() if not raw: return (None, None) From 5239668e8e332ee3063cf27ef9dc1427babbf432 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 18 Apr 2026 17:46:59 -0500 Subject: [PATCH 2/2] refactor(anchor): buffer bytes instead of string Resolves Riley's comments, makes it so UTF-8 characters over 1 byte aren't destroyed --- src/anchor_pkg/anchor_pkg/connector.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/anchor_pkg/anchor_pkg/connector.py b/src/anchor_pkg/anchor_pkg/connector.py index 5adafbc..e03fcd1 100644 --- a/src/anchor_pkg/anchor_pkg/connector.py +++ b/src/anchor_pkg/anchor_pkg/connector.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from time import monotonic +from typing import TYPE_CHECKING from astra_msgs.msg import VicCAN from std_msgs.msg import String from rclpy.clock import Clock @@ -101,7 +102,7 @@ class SerialConnector(Connector): mcu_name: str | None = None # Serial buffering - self._serial_buffer = "" + self._serial_buffer: bytes = b"" self._last_read_time = monotonic() if serial_override: @@ -212,35 +213,40 @@ class SerialConnector(Connector): Returns: str: A hopefully-complete string read from the MCU via the serial interface. """ + if TYPE_CHECKING: + assert type(self.serial_interface) == serial.Serial + + # Warn on buffer timeout, as the only scenarios that would trigger this are + # a microcontroller output that isn't newline-terminated (bad), or the MCU is + # hanging (also bad). if ( self._serial_buffer and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT ): - # Warn on buffer timeout, as the only scenarios that would trigger this are - # a microcontroller output that isn't newline-terminated (bad), or the MCU - # hanging (also bad). self.logger.warn( f"Serial buffer timeout, last received '{self._serial_buffer}'." ) result = self._serial_buffer - self._serial_buffer = "" + self._serial_buffer = b"" self._last_read_time = monotonic() - return result + return str(result, "utf8").strip() # No try-except here so caller catches it instead. - raw = str(self.serial_interface.readline(), "utf8") + raw = self.serial_interface.readline() + # Empty or whitespace-only string if not raw or not raw.strip(): return None - if not (raw.endswith("\n") or raw.endswith("\r")): # unfinished string + # Add to buffer or send finished buffer + if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string self._serial_buffer += raw self._last_read_time = monotonic() return None else: result = self._serial_buffer + raw - self._serial_buffer = "" - return result.strip() + self._serial_buffer = b"" + return str(result, "utf8").strip() def read(self) -> tuple[VicCAN | None, str | None]: try: