mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 11:51:16 -05:00
Compare commits
3 Commits
fc2ba5f8d1
...
serial-buf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5239668e8e | ||
|
|
c166668415 | ||
|
|
333249677f |
@@ -1,4 +1,6 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from time import monotonic
|
||||
from typing import TYPE_CHECKING
|
||||
from astra_msgs.msg import VicCAN
|
||||
from std_msgs.msg import String
|
||||
from rclpy.clock import Clock
|
||||
@@ -20,8 +22,11 @@ KNOWN_USBS = [
|
||||
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
|
||||
(0x1A86, 0x55D3), # ESP32 S3 Development Board
|
||||
]
|
||||
|
||||
BAUD_RATE = 115200
|
||||
|
||||
SERIAL_READ_TIMEOUT = 0.5 # seconds
|
||||
|
||||
MCU_IDS = [
|
||||
"broadcast",
|
||||
"core",
|
||||
@@ -96,6 +101,10 @@ class SerialConnector(Connector):
|
||||
ports = self._find_ports()
|
||||
mcu_name: str | None = None
|
||||
|
||||
# Serial buffering
|
||||
self._serial_buffer: bytes = b""
|
||||
self._last_read_time = monotonic()
|
||||
|
||||
if serial_override:
|
||||
logger.warn(
|
||||
f"using serial_override: `{serial_override}`! this will bypass several checks."
|
||||
@@ -182,9 +191,66 @@ class SerialConnector(Connector):
|
||||
|
||||
return None
|
||||
|
||||
def _try_readline(self) -> str | None:
|
||||
"""Attempts to read a full string from the MCU without blocking.
|
||||
|
||||
When pyserial is used with 'timeout=0', reads are performed non-blocking.
|
||||
When used with interface.readline(), this breaks the assumption that a returned
|
||||
string will be a completed attempt by the MCU to send a string; it may be
|
||||
cut off between the start of the string and the newline, removing information
|
||||
and rendering the string(s) useless; thus, to get around the downside of readline()
|
||||
not waiting for a newline while still not blocking, this function manually
|
||||
implements a serial input buffer and newline timeout.
|
||||
|
||||
If readline() returns a non-empty string, send it if it ends with a newline
|
||||
(readline() will not read past any newline); otherwise, save the read string.
|
||||
This buffered string should be pre-pended to the next readline() result.
|
||||
|
||||
If readline() does not receive a non-empty string after the last non-newline-
|
||||
terminated readline() result within the manual timeout, send the contents of the
|
||||
buffer as if it ended with a newline, and clear the buffer.
|
||||
|
||||
Returns:
|
||||
str: A hopefully-complete string read from the MCU via the serial interface.
|
||||
"""
|
||||
if TYPE_CHECKING:
|
||||
assert type(self.serial_interface) == serial.Serial
|
||||
|
||||
# Warn on buffer timeout, as the only scenarios that would trigger this are
|
||||
# a microcontroller output that isn't newline-terminated (bad), or the MCU is
|
||||
# hanging (also bad).
|
||||
if (
|
||||
self._serial_buffer
|
||||
and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT
|
||||
):
|
||||
self.logger.warn(
|
||||
f"Serial buffer timeout, last received '{self._serial_buffer}'."
|
||||
)
|
||||
result = self._serial_buffer
|
||||
self._serial_buffer = b""
|
||||
self._last_read_time = monotonic()
|
||||
return str(result, "utf8").strip()
|
||||
|
||||
# No try-except here so caller catches it instead.
|
||||
raw = self.serial_interface.readline()
|
||||
|
||||
# Empty or whitespace-only string
|
||||
if not raw or not raw.strip():
|
||||
return None
|
||||
|
||||
# Add to buffer or send finished buffer
|
||||
if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string
|
||||
self._serial_buffer += raw
|
||||
self._last_read_time = monotonic()
|
||||
return None
|
||||
else:
|
||||
result = self._serial_buffer + raw
|
||||
self._serial_buffer = b""
|
||||
return str(result, "utf8").strip()
|
||||
|
||||
def read(self) -> tuple[VicCAN | None, str | None]:
|
||||
try:
|
||||
raw = str(self.serial_interface.readline(), "utf8")
|
||||
raw = self._try_readline()
|
||||
|
||||
if not raw:
|
||||
return (None, None)
|
||||
|
||||
Reference in New Issue
Block a user