4 Commits

Author SHA1 Message Date
Riley M.
b96d75a787 Merge pull request #36 from SHC-ASTRA/serial-buffer 2026-04-20 16:06:03 -05:00
David
5239668e8e refactor(anchor): buffer bytes instead of string
Resolves Riley's comments, makes it so UTF-8 characters over 1 byte aren't destroyed
2026-04-18 17:46:59 -05:00
David
c166668415 feat(anchor): implement serial read buffering for non-blocking reads 2026-04-18 02:52:58 -05:00
ryleu
333249677f revert: fix(anchor): serial reads are now non-blocking
This reverts commit fc2ba5f8d1.

reason: making serail reads non-blocking caused serial data issues
because sometimes it would exit before actually seeing a newline
2026-04-15 11:24:19 -05:00

View File

@@ -1,4 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from time import monotonic
from typing import TYPE_CHECKING
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
from std_msgs.msg import String from std_msgs.msg import String
from rclpy.clock import Clock from rclpy.clock import Clock
@@ -20,8 +22,11 @@ KNOWN_USBS = [
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1 (0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board (0x1A86, 0x55D3), # ESP32 S3 Development Board
] ]
BAUD_RATE = 115200 BAUD_RATE = 115200
SERIAL_READ_TIMEOUT = 0.5 # seconds
MCU_IDS = [ MCU_IDS = [
"broadcast", "broadcast",
"core", "core",
@@ -96,6 +101,10 @@ class SerialConnector(Connector):
ports = self._find_ports() ports = self._find_ports()
mcu_name: str | None = None mcu_name: str | None = None
# Serial buffering
self._serial_buffer: bytes = b""
self._last_read_time = monotonic()
if serial_override: if serial_override:
logger.warn( logger.warn(
f"using serial_override: `{serial_override}`! this will bypass several checks." f"using serial_override: `{serial_override}`! this will bypass several checks."
@@ -182,9 +191,66 @@ class SerialConnector(Connector):
return None return None
def _try_readline(self) -> str | None:
"""Attempts to read a full string from the MCU without blocking.
When pyserial is used with 'timeout=0', reads are performed non-blocking.
When used with interface.readline(), this breaks the assumption that a returned
string will be a completed attempt by the MCU to send a string; it may be
cut off between the start of the string and the newline, removing information
and rendering the string(s) useless; thus, to get around the downside of readline()
not waiting for a newline while still not blocking, this function manually
implements a serial input buffer and newline timeout.
If readline() returns a non-empty string, send it if it ends with a newline
(readline() will not read past any newline); otherwise, save the read string.
This buffered string should be pre-pended to the next readline() result.
If readline() does not receive a non-empty string after the last non-newline-
terminated readline() result within the manual timeout, send the contents of the
buffer as if it ended with a newline, and clear the buffer.
Returns:
str: A hopefully-complete string read from the MCU via the serial interface.
"""
if TYPE_CHECKING:
assert type(self.serial_interface) == serial.Serial
# Warn on buffer timeout, as the only scenarios that would trigger this are
# a microcontroller output that isn't newline-terminated (bad), or the MCU is
# hanging (also bad).
if (
self._serial_buffer
and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT
):
self.logger.warn(
f"Serial buffer timeout, last received '{self._serial_buffer}'."
)
result = self._serial_buffer
self._serial_buffer = b""
self._last_read_time = monotonic()
return str(result, "utf8").strip()
# No try-except here so caller catches it instead.
raw = self.serial_interface.readline()
# Empty or whitespace-only string
if not raw or not raw.strip():
return None
# Add to buffer or send finished buffer
if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string
self._serial_buffer += raw
self._last_read_time = monotonic()
return None
else:
result = self._serial_buffer + raw
self._serial_buffer = b""
return str(result, "utf8").strip()
def read(self) -> tuple[VicCAN | None, str | None]: def read(self) -> tuple[VicCAN | None, str | None]:
try: try:
raw = str(self.serial_interface.readline(), "utf8") raw = self._try_readline()
if not raw: if not raw:
return (None, None) return (None, None)