19 Commits

Author SHA1 Message Date
David
87237bd841 chore(anchor): use new version commit hash field name 2026-04-18 15:09:30 -05:00
David
79b2d0020f feat(anchor): add is_main and is_dirty to versioning 2026-04-15 23:50:47 -05:00
David
3dd9525833 feat(anchor): add MCU Versioning feedback 2026-04-14 15:38:41 -05:00
Riley M.
8404999369 Merge pull request #31 from SHC-ASTRA/can-refactor
Refactor anchor & add direct CAN connector
2026-04-08 00:18:13 -05:00
ryleu
88574524cf clarify the mock connector usage in the README 2026-04-08 00:15:39 -05:00
ryleu
30bb32a66b remove extraneous slice 2026-04-08 00:09:04 -05:00
David
010d2da0b6 fix: string number 2026-04-07 23:45:40 -05:00
ryleu
0a257abf43 make the pad 3 -> logic consistent 2026-04-07 23:44:38 -05:00
ryleu
b09b55bee0 fix bug because apparently python has arrays 2026-04-07 22:19:55 -05:00
ryleu
ec7f272934 clean up code 2026-04-07 22:16:08 -05:00
ryleu
bc9183d59a make mock mcu use VicCAN messages 2026-04-07 21:52:52 -05:00
ryleu
410d3706ed update README with mock connector instructions 2026-04-02 19:49:07 -05:00
ryleu
89b3194914 update documentation and accept 3-value VicCAN messages 2026-04-02 19:43:10 -05:00
ryleu
4ef226c094 nix fmt 2026-04-01 03:31:21 -05:00
SHC-ASTRA
327539467c fixed can connector 2026-04-01 02:50:12 -05:00
SHC-ASTRA
e570d371c6 fix a plethora of bugs related to the serial connector 2026-04-01 01:48:40 -05:00
ryleu
f7efa604d2 finish adding parameters 2026-03-23 20:39:50 -05:00
ryleu
fe46a2ab4d fix wrong order for initialization 2026-03-23 13:25:13 -05:00
ryleu
941e196316 implement review comments 2026-03-21 18:14:44 -05:00
7 changed files with 417 additions and 201 deletions

View File

@@ -60,6 +60,33 @@ $ ros2 launch anchor_pkg rover.launch.py # Must be run on a computer connected
$ ros2 run headless_pkg headless_full # Optionally run in a separate shell on the same or different computer.
```
### Using the Mock Connector
Anchor provides a mock connector meant for testing and scripting purposes. You can select the mock connector by running anchor with this command:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:="mock"
```
To see all data that would be sent over the CAN network (and thus to the microcontrollers), use this command:
```bash
$ ros2 topic echo /anchor/to_vic/debug
```
To send data to the mock connector (as if you were a ROS2 node), use the normal relay topic:
```bash
$ ros2 topic pub /anchor/to_vic/relay astra_msgs/msg/VicCAN '{mcu_name: "core", command_id: 50, data: [0.0, 2.0, 0.0, 1.0]}'
```
To send data to the mock connector (as if you were a microcontroller), publish to the dedicated topic:
```bash
$ ros2 topic pub /anchor/from_vic/mock_mcu astra_msgs/msg/VicCAN '{mcu_name: "arm", command_id: 55, data: [0.0, 450.0, 900.0, 0.0]}'
```
### Testing Serial
You can fake the presence of a Serial device (i.e., MCU) by using the following command:
@@ -68,10 +95,31 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
```
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so:
When you go to run anchor, use the `serial_override` ROS2 parameter to point it to the fake serial port, like so:
```bash
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py
$ ros2 launch anchor_pkg rover.launch.py connector:=serial serial_override:=/tmp/ttyACM9
```
### Testing CAN
You can create a virtual CAN network by using the following commands to create and then enable it:
```bash
sudo ip link add dev vcan0 type vcan
sudo ip link set vcan0 up
```
When you go to run anchor, use the `can_override` ROS2 parameter to point it to the virtual CAN network, like so:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:=can can_override:=vcan0
```
Once you're done, you should delete the virtual network so that anchor doesn't get confused if you plug in a real CAN adapter:
```bash
$ sudo ip link delete vcan0
```
### Connecting the GuliKit Controller

View File

@@ -1,9 +1,10 @@
from warnings import deprecated
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal
import atexit
from .connector import (
@@ -15,11 +16,11 @@ from .connector import (
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import sys
import threading
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
from builtin_interfaces.msg import Time
from std_msgs.msg import String
from astra_msgs.msg import VicCAN, McuVersion
class Anchor(Node):
@@ -33,14 +34,23 @@ class Anchor(Node):
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
* /anchor/to_vic/debug
- A string copy of the messages published to ./relay are published here
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
- For testing without an actual MCU, publish ViCAN messages here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Send raw strings to connectors. Does not work for connectors that require conversion (like CANConnector)
* /anchor/relay
- Legacy method for talking to connectors. Takes String as input, but does not send the raw strings to connectors.
Instead, it converts them to VicCAN messages first.
"""
ASTRA_EPOCH = time.struct_time((2022, 1, 1, 0, 0, 0, 0, 0, 0)) # January 1, 2022
connector: Connector
def __init__(self):
@@ -48,6 +58,8 @@ class Anchor(Node):
logger = self.get_logger()
# ROS2 Parameter Setup
self.declare_parameter(
"connector",
"auto",
@@ -59,78 +71,171 @@ class Anchor(Node):
),
)
self.declare_parameter(
"can_override",
"",
ParameterDescriptor(
name="can_override",
description="Overrides which CAN channel will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid CAN network that shows up in `ip link show`.",
),
)
self.declare_parameter(
"serial_override",
"",
ParameterDescriptor(
name="serial_override",
description="Overrides which serial port will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid path to a serial device file that shows up in `ls /dev/tty*`.",
),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
can_override = (
self.get_parameter("can_override").get_parameter_value().string_value
)
serial_override = (
self.get_parameter("serial_override").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(self.get_logger())
self.connector = SerialConnector(
logger, self.get_clock(), serial_override
)
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(self.get_logger())
self.connector = CANConnector(logger, self.get_clock(), can_override)
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(self.get_logger())
self.connector = MockConnector(logger, self.get_clock())
case "auto":
logger.info("automatically determining connector")
try:
logger.info("trying CAN connector")
self.connector = CANConnector(self.get_logger())
self.connector = CANConnector(
logger, self.get_clock(), can_override
)
except (NoValidDeviceException, NoWorkingDeviceException, TypeError):
logger.info("CAN connector failed, trying serial connector")
self.connector = SerialConnector(self.get_logger())
self.connector = SerialConnector(
logger, self.get_clock(), serial_override
)
case _:
self.get_logger().fatal(
logger.fatal(
f"invalid value for connector parameter: {connector_select}"
)
exit(1)
# Close devices on exit
atexit.register(self.cleanup)
# ROS2 Topic Setup
# Publishers
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
self.fromvic_debug_pub_ = self.create_publisher( # only used by serial
String,
"/anchor/from_vic/debug",
20,
)
self.fromvic_core_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/core", 20
VicCAN,
"/anchor/from_vic/core",
20,
)
self.fromvic_arm_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/arm", 20
VicCAN,
"/anchor/from_vic/arm",
20,
)
self.fromvic_bio_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/bio", 20
VicCAN,
"/anchor/from_vic/bio",
20,
)
# Debug publisher
self.tovic_debug_pub_ = self.create_publisher(
VicCAN,
"/anchor/to_vic/debug",
20,
)
# MCU Version publisher
self.mcu_version_pub_ = self.create_publisher(
McuVersion,
"/anchor/from_vic/mcu_version",
20,
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.connector.write, 20
VicCAN,
"/anchor/to_vic/relay",
self.write_connector,
20,
)
self.tovic_sub_legacy_ = self.create_subscription(
String,
"/anchor/relay",
self.write_connector_legacy,
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
VicCAN,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
)
self.tovic_string_sub_ = self.create_subscription(
String,
"/anchor/to_vic/relay_string",
self.connector.write_raw,
20,
)
self.mcu_versions: dict[str, McuVersion] = {}
# Close devices on exit
atexit.register(self.cleanup)
def cleanup(self):
self.connector.cleanup()
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output = self.connector.read()
def read_connector(self):
"""Check the connector for new data from the MCU, and publish string to appropriate topics"""
viccan, raw = self.connector.read()
if not output:
return
if raw:
self.fromvic_debug_pub_.publish(String(data=raw))
self.relay_fromvic(output)
if viccan:
self.relay_fromvic(viccan)
def write_connector(self, msg: VicCAN):
"""Write to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write(msg)
self.tovic_debug_pub_.publish(msg)
@deprecated(
"Use /anchor/to_vic/relay or /anchor/to_vic/relay_string instead of /anchor/relay"
)
def write_connector_legacy(self, msg: String):
"""Write to the connector by first attempting to convert String to VicCAN"""
# please do not reference this code. ~riley
for cmd in msg.data.split("\n"):
viccan = string_to_viccan(
cmd,
"anchor",
self.get_logger(),
self.get_clock().now().to_msg(),
)
if viccan:
self.write_connector(viccan)
def relay_fromvic(self, msg: VicCAN):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
msg.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic")
"""Relay a message from the MCU to the appropriate VicCAN topic"""
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
@@ -138,14 +243,25 @@ class Anchor(Node):
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
def on_mock_fromvic(self, msg: String):
viccan = string_to_viccan(
msg.data,
"mock",
self.get_logger(),
)
if viccan:
self.relay_fromvic(viccan)
# MCU Versioning information
if msg.command_id in (46, 47) and msg.mcu_name not in self.mcu_versions:
self.mcu_versions[msg.mcu_name] = McuVersion(mcu_name=msg.mcu_name)
if msg.command_id == 46: # commit hashes
self.mcu_versions[msg.mcu_name].project_commit_fragment = msg.data[0]
self.mcu_versions[msg.mcu_name].astra_lib_commit_fragment = msg.data[1]
elif msg.command_id == 47: # build timestamp and version numbers
version_msg = self.mcu_versions[msg.mcu_name]
version_msg.build_time = Time(
sec=int(time.mktime(self.ASTRA_EPOCH) + msg.data[0])
)
# is_main and is_dirty is in msg.data[1]
# Out of 1 byte, it looks like [lib_isdirty][lib_ismain][proj_isdirty][proj_ismain]
version_msg.astra_lib_is_dirty = bool(int(msg.data[1]) >> 3 & 0x1)
version_msg.astra_lib_is_main = bool(int(msg.data[1]) >> 2 & 0x1)
version_msg.project_is_dirty = bool(int(msg.data[1]) >> 1 & 0x1)
version_msg.project_is_main = bool(int(msg.data[1]) & 0x1)
self.mcu_version_pub_.publish(version_msg)
def main(args=None):
@@ -158,16 +274,9 @@ def main(args=None):
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
anchor_node.read_connector() # Check the connector for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.clock import Clock
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
from .convert import string_to_viccan as _string_to_viccan, viccan_to_string
# CAN
import can
@@ -20,6 +21,16 @@ KNOWN_USBS = [
]
BAUD_RATE = 115200
MCU_IDS = [
"broadcast",
"core",
"arm",
"digit",
"faerie",
"citadel",
"libs",
]
class NoValidDeviceException(Exception):
pass
@@ -39,18 +50,36 @@ class DeviceClosedException(Exception):
class Connector(ABC):
logger: RcutilsLogger
clock: Clock
def __init__(self, logger: RcutilsLogger):
def string_to_viccan(self, msg: str, mcu_name: str):
"""function currying so that we do not need to pass logger and clock every time"""
return _string_to_viccan(
msg,
mcu_name,
self.logger,
self.clock.now().to_msg(),
)
def __init__(self, logger: RcutilsLogger, clock: Clock):
self.logger = logger
self.clock = clock
@abstractmethod
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
"""
Must return a tuple of (VicCAN, debug message or string repr of VicCAN)
"""
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
@abstractmethod
def write_raw(self, msg: str):
pass
def cleanup(self):
pass
@@ -59,12 +88,19 @@ class SerialConnector(Connector):
port: str
mcu_name: str
serial_interface: serial.Serial
override: bool
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock, serial_override: str = ""):
super().__init__(logger, clock)
ports = self._find_ports()
mcu_name: str | None = None
if serial_override:
logger.warn(
f"using serial_override: `{serial_override}`! this will bypass several checks."
)
ports = [serial_override]
mcu_name = "override"
if len(ports) <= 0:
raise NoValidDeviceException("no valid serial device found")
@@ -75,7 +111,8 @@ class SerialConnector(Connector):
# check each of our ports to make sure one of them is responding
port = ports[0]
mcu_name = self._get_name(port)
# we might already have a name by now if we overrode earlier
mcu_name = mcu_name or self._get_name(port)
if not mcu_name:
raise NoWorkingDeviceException(
f"found {port}, but it did not respond with its name"
@@ -136,34 +173,36 @@ class SerialConnector(Connector):
)
if serial_interface.is_open:
# turn relay mode off if it failed to respond with its name
serial_interface.write(b"can_relay_mode,off\n")
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return None
return (None, None)
return string_to_viccan(raw, self.mcu_name, self.logger)
return (
self.string_to_viccan(raw, self.mcu_name),
raw,
)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except TypeError as e:
self.logger.error(f"TypeError: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
pass # pretty much no other error matters
return (None, None) # pretty much no other error matters
def write(self, msg: VicCAN):
# go from [ w, x, y, z ] -> "w,x,y,z" & round to 7 digits max
data = ",".join([str(round(x, 7)) for x in msg.data])
output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n"
self.serial_interface.write(bytes(output, "utf8"))
self.write_raw(viccan_to_string(msg))
def write_raw(self, msg: str):
self.serial_interface.write(bytes(msg, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
@@ -175,26 +214,36 @@ class SerialConnector(Connector):
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock, can_override: str):
super().__init__(logger, clock)
self.can_channel: str | None = None
self.can_bus: can.BusABC | None = None
if self.can_channel is None:
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
# filter to busses whose channel matches the can_override
if can_override:
self.logger.info(f"overrode can interface with {can_override}")
avail = list(
filter(
lambda b: b.get("channel") == can_override,
avail,
)
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
try:
self.can_bus = can.Bus(
@@ -208,9 +257,9 @@ class CANConnector(Connector):
)
if self.can_channel and self.can_channel.startswith("v"):
self.logger.warn("using virtual CAN interface; this is likely vcan*")
self.logger.warn("CAN interface is likely virtual")
def read(self) -> VicCAN | None:
def read(self) -> tuple[VicCAN | None, str | None]:
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
@@ -221,7 +270,7 @@ class CANConnector(Connector):
raise DeviceClosedException("CAN bus closed unexpectedly")
if message is None:
return None
return (None, None)
arbitration_id = message.arbitration_id & 0x7FF
data_bytes = bytes(message.data)
@@ -230,21 +279,13 @@ class CANConnector(Connector):
data_type_key = (arbitration_id >> 6) & 0b11
command = arbitration_id & 0x3F
key_to_mcu: dict[int, str] = {
1: "broadcast",
2: "core",
3: "arm",
4: "digit",
5: "faerie",
6: "citadel",
}
mcu_name = key_to_mcu.get(mcu_key)
if mcu_name is None:
try:
mcu_name = MCU_IDS[mcu_key]
except IndexError:
self.logger.warn(
f"received CAN frame with unknown MCU key {mcu_key}; id=0x{arbitration_id:X}"
)
return None
return (None, None)
data: list[float] = []
@@ -256,33 +297,33 @@ class CANConnector(Connector):
self.logger.warn(
f"received double payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
(value,) = struct.unpack("<d", data_bytes[:8])
return (None, None)
(value,) = struct.unpack(">d", data_bytes[:8])
data = [float(value)]
elif data_type_key == 1:
if len(data_bytes) < 8:
self.logger.warn(
f"received float32x2 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
v1, v2 = struct.unpack("<ff", data_bytes[:8])
return (None, None)
v1, v2 = struct.unpack(">ff", data_bytes[:8])
data = [float(v1), float(v2)]
elif data_type_key == 2:
if len(data_bytes) < 8:
self.logger.warn(
f"received int16x4 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return None
i1, i2, i3, i4 = struct.unpack("<hhhh", data_bytes[:8])
return (None, None)
i1, i2, i3, i4 = struct.unpack(">hhhh", data_bytes[:8])
data = [float(i1), float(i2), float(i3), float(i4)]
else:
self.logger.warn(
f"received CAN frame with unknown data_type_key {data_type_key}; id=0x{arbitration_id:X}"
)
return None
return (None, None)
except struct.error as e:
self.logger.error(f"error unpacking CAN payload: {e}")
return None
return (None, None)
viccan = VicCAN(
mcu_name=mcu_name,
@@ -295,98 +336,65 @@ class CANConnector(Connector):
f"decoded as VicCAN(mcu_name={viccan.mcu_name}, command_id={viccan.command_id}, data={viccan.data})"
)
return viccan
return (
viccan,
f"{viccan.mcu_name},{viccan.command_id},"
+ ",".join(map(str, list(viccan.data))),
)
def write(self, msg: VicCAN):
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
# Build 11-bit arbitration ID according to the VicCAN scheme:
# build 11-bit arbitration ID according to VicCAN spec:
# bits 10..8: targeted MCU key
# bits 7..6: data type key
# bits 5..0: command
# Map MCU name to 3-bit key.
mcu_name = (msg.mcu_name or "").lower()
mcu_key_map: dict[str, int] = {
"broadcast": 1,
"core": 2,
"arm": 3,
"digit": 4,
"faerie": 5,
"citadel": 6,
}
if mcu_name not in mcu_key_map:
# map MCU name to 3-bit key.
try:
mcu_id = MCU_IDS.index((msg.mcu_name or "").lower())
except ValueError:
self.logger.error(
f"unknown VicCAN mcu_name '{msg.mcu_name}' for CAN frame; dropping message"
)
return
mcu_key = mcu_key_map[mcu_name] & 0b111
# Infer data type key from payload length according to the table:
# determine data type from length:
# 0: double x1, 1: float32 x2, 2: int16 x4, 3: empty
data_len = len(msg.data)
if data_len == 0:
data_type_key = 3
elif data_len == 1:
data_type_key = 0
elif data_len == 2:
data_type_key = 1
elif data_len == 4:
data_type_key = 2
else:
# Fallback: treat any other non-zero length as float32 x2
self.logger.warn(
f"unexpected VicCAN data length {data_len}; encoding as float32 x2 (key=1) and truncating/padding as needed"
)
data_type_key = 1
# Command is limited to 6 bits.
command = int(msg.command_id)
if command < 0:
self.logger.error(f"invalid negative command_id for CAN frame: {command}")
return
if command > 0x3F:
self.logger.warn(
f"command_id 0x{command:X} exceeds 6-bit range; truncating to lower 6 bits"
)
command &= 0x3F
arbitration_id = (
((mcu_key & 0b111) << 8) | ((data_type_key & 0b11) << 6) | (command & 0x3F)
)
# Map VicCAN.data (floats) to up to 8 CAN data bytes.
raw_bytes: list[int] = []
for value in msg.data:
try:
b = int(round(value))
except (TypeError, ValueError):
match data_len := len(msg.data):
case 0:
data_type = 3
data = bytes()
case 1:
data_type = 0
data = struct.pack(">d", *msg.data)
case 2:
data_type = 1
data = struct.pack(">ff", *msg.data)
case 3 | 4: # 3 gets treated as 4
data_type = 2
if data_len == 3:
msg.data.append(0)
data = struct.pack(">hhhh", *[int(x) for x in msg.data])
case _:
self.logger.error(
f"non-numeric VicCAN data value: {value}; dropping message"
f"unexpected VicCAN data length: {data_len}; dropping message"
)
return
if b < 0 or b > 255:
self.logger.warn(
f"VicCAN data value {value} out of byte range; clamping into [0, 255]"
)
b = max(0, min(255, b))
raw_bytes.append(b)
if len(raw_bytes) > 8:
self.logger.warn(
f"VicCAN data too long for single CAN frame ({len(raw_bytes)} > 8); truncating"
# command is limited to 6 bits.
command = int(msg.command_id)
if command < 0 or command > 0x3F:
self.logger.error(
f"invalid command_id for CAN frame: {command}; dropping message"
)
raw_bytes = raw_bytes[:8]
return
try:
can_message = can.Message(
arbitration_id=arbitration_id,
data=raw_bytes,
arbitration_id=(mcu_id << 8) | (data_type << 6) | command,
data=data,
is_extended_id=False,
)
except Exception as e:
@@ -403,6 +411,9 @@ class CANConnector(Connector):
self.logger.error(f"CAN error while sending: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
def write_raw(self, msg: str):
self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}")
def cleanup(self):
try:
if self.can_bus is not None:
@@ -413,11 +424,15 @@ class CANConnector(Connector):
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def __init__(self, logger: RcutilsLogger, clock: Clock):
super().__init__(logger, clock)
# No hardware interface for MockConnector. Publish to `/anchor/from_vic/mock_mcu` instead.
def read(self) -> VicCAN | None:
return None
def read(self) -> tuple[VicCAN | None, str | None]:
return (None, None)
def write(self, msg: VicCAN):
print(msg)
pass
def write_raw(self, msg: str):
pass

View File

@@ -1,30 +1,22 @@
from astra_msgs.msg import VicCAN
from std_msgs.msg import Header
from builtin_interfaces.msg import Time
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
def string_to_viccan(
msg: str, mcu_name: str, logger: RcutilsLogger, time: Time
) -> VicCAN | None:
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed.
Parameters:
* msg: str
- The message in serial VicCAN format
* mcu_name: str
- The name of the MCU (e.g. core, citadel, arm)
* logger: RcutilsLogger
- A logger retrieved from node.get_logger()
Returns:
* VicCAN | None
- The VicCAN message on a success or None on a failure
On a failure, it will log at a debug level why it failed and return None.
"""
parts: list[str] = msg.split(",")
parts: list[str] = msg.strip().split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
if not parts[0].startswith("can_relay_"):
logger.debug(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
@@ -34,8 +26,40 @@ def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
return VicCAN(
mcu_name=parts[1],
command_id=int(parts[2]),
data=[float(x) for x in parts[3:]],
)
try:
command_id = int(parts[2])
except ValueError:
logger.debug(
f"got garbage (non-integer command id) CAN data from {mcu_name}: {msg}"
)
return None
if command_id not in range(64):
logger.debug(
f"got garbage (wrong command id {command_id}) CAN data from {mcu_name}: {msg}"
)
return None
try:
return VicCAN(
header=Header(
stamp=time,
frame_id="from_vic",
),
mcu_name=parts[1],
command_id=command_id,
data=[float(x) for x in parts[3:]],
)
except ValueError:
logger.debug(f"got garbage (non-numerical) CAN data from {mcu_name}: {msg}")
return None
def viccan_to_string(viccan: VicCAN) -> str:
"""Converts a ROS2 VicCAN message to the serial string VicCAN format."""
# make sure we accept 3 digits and treat it as 4
if len(viccan.data) == 3:
viccan.data.append(0)
# go from [ w, x, y, z ] -> ",w,x,y,z" & round to 7 digits max
data = "".join([f",{round(val,7)}" for val in viccan.data])
return f"can_relay_tovic,{viccan.mcu_name},{viccan.command_id}{data}\n"

View File

@@ -7,6 +7,8 @@ from launch_ros.actions import Node
def generate_launch_description():
connector = LaunchConfiguration("connector")
serial_override = LaunchConfiguration("serial_override")
can_override = LaunchConfiguration("can_override")
use_ptz = LaunchConfiguration("use_ptz")
ld = LaunchDescription()
@@ -16,7 +18,23 @@ def generate_launch_description():
DeclareLaunchArgument(
"connector",
default_value="auto",
description="Connector parameter for anchor node (default: auto)",
description="Connector parameter for anchor node (default: 'auto')",
)
)
ld.add_action(
DeclareLaunchArgument(
"serial_override",
default_value="",
description="Serial port override parameter for anchor node (default: '')",
)
)
ld.add_action(
DeclareLaunchArgument(
"can_override",
default_value="",
description="CAN network override parameter for anchor node (default: '')",
)
)
@@ -82,6 +100,8 @@ def generate_launch_description():
{
"launch_mode": "anchor",
"connector": connector,
"serial_override": serial_override,
"can_override": can_override,
}
],
on_exit=Shutdown(),

View File

@@ -262,7 +262,7 @@ class PtzNode(Node):
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
)
self.debug_pub.publish(msg)
self.get_logger().info(message_text)
self.get_logger().debug(message_text)
def run_async_func(self, coro):
"""Run an async function in the event loop."""