4 Commits

Author SHA1 Message Date
ryleu
d311c326c6 fix the readme 2026-01-22 01:54:05 -05:00
ryleu
23f49d362e make core_ptz stfu 2026-01-22 01:50:40 -05:00
ryleu
f558389863 fix the parameter 2026-01-22 01:48:09 -05:00
ryleu
5e2bbe23a3 general cleanup before i start work on CANceiver 2026-01-22 00:31:46 -05:00
3 changed files with 107 additions and 156 deletions

View File

@@ -65,10 +65,10 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
```
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so:
When you go to run anchor, use the `port_override` launch parameter to point it to the fake serial port, like so:
```bash
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py
$ ros2 launch anchor_pkg rover.launch.py port_override:=/tmp/ttyACM9
```
### Connecting the GuliKit Controller
@@ -121,23 +121,24 @@ A: Don't worry about it. If you had the workspace sourced, ROS2 will complain ab
...
```
A: To find a microcontroller to talk to, Anchor sends a ping to every Serial port on your computer. If it does not receive a 'pong' in less than one second, then it aborts. There are a few possible fixes:
A: To find a microcontroller to talk to, Anchor filters through your available serial ports to find microcontrollers. If the microcontroller fails
to respond properly, or one is not found, it will abort. There are a few possible fixes:
- Keep trying to run it until it works
- Run `lsusb` to see if the microcontroller is detected by your computer.
- Run `ls /dev/tty*0` to see if there is a valid Serial port enumerated for the microcontroller.
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`).
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`) by using the `groups` command.
## Packages
- [anchor\_pkg](./src/anchor_pkg) - Handles Serial communication between the various other packages here and the microcontroller.
- [arm\_pkg](./src/arm_pkg) - Relays controls and sensor data for the arm (socket and digit) between anchor and basestation/headless.
- [astra\_descriptions](./src/astra_descriptions) - Submodule with URDF-related packages.
- [astra\_msgs](./src/astra_msgs) - Contains custom message types for communication between basestation and the rover over ROS2.
- [bio\_pkg](./src/bio_pkg) - Like arm_pkg, but for CITADEL and FAERIE
- [core\_pkg](./src/core_pkg) - Like arm_pkg, but for Core
- [headless\_pkg](./src/headless_pkg) - Simple, non-graphical controller node to work in place of basestation when controlling the rover by itself. This is autostarted with anchor to allow for setup-less control of the rover.
- [latency\_tester](./src/latency_tester) - A temporary node to test comms latency over ROS2, Serial, and CAN.
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Maintainers

View File

@@ -1,20 +1,17 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty
import signal
import time
import atexit
import glob
import signal
import threading
from typing import cast
import rclpy
import serial
import serial.tools.list_ports
import os
import sys
import threading
import glob
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import Header, String
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
KNOWN_USBS = [
@@ -46,15 +43,23 @@ class Anchor(Node):
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
serial_port: str | None = None # e.g., "/dev/ttyUSB0"
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
super().__init__("anchor_node")
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
self.declare_parameter(
"port_override",
"",
ParameterDescriptor(
name="port_override", type=ParameterType.PARAMETER_STRING
),
)
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
if port_override := self.get_parameter("port_override").value:
self.serial_port = cast(str, port_override) # Cast to make the linter happy
##################################################
# Serial MCU Discovery
@@ -62,104 +67,64 @@ class Anchor(Node):
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
recog_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
# Filter for ports we know that aren't invalid
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
# Guards
if len(recog_ports) > 1: # If we found too many
self.get_logger().fatal(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
exit(1)
if len(recog_ports) == 0:
self.get_logger().fatal(
f"Found no recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
exit(1)
# Everything is ok
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# String, location of device file; e.g., '/dev/ttyACM0'
self.serial_port = found_port.device
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
self.serial_interface = serial.Serial(self.serial_port, 115200, timeout=1)
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
self.serial_interface.write(b"can_relay_mode,on\n") # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
for _ in range(4): # Sometimes it takes a sec
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
try:
args: list[str] = response.decode("utf8").strip().split(",")
except UnicodeDecodeError:
continue # ignore malformed responses
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
self.get_logger().fatal(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
self.get_logger().fatal(e.strerror)
exit(1)
# Close serial port on exit
atexit.register(self.cleanup)
@@ -205,16 +170,27 @@ class Anchor(Node):
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output: str | None = None
try:
output = str(self.serial_interface.readline(), "utf8")
except serial.SerialException as e:
self.get_logger().fatal(f"SerialException: {e}")
exit(1)
except TypeError as e:
self.get_logger().fatal(f"TypeError: {e}")
exit(1)
except Exception as e:
self.get_logger().error(f"Exception: {e}")
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
# All output over debug
self.debug_pub.publish(msg)
# Send the message to the right place
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
@@ -225,34 +201,7 @@ class Anchor(Node):
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
@@ -360,6 +309,6 @@ def main(args=None):
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
signal.SIGTERM, lambda signum, frame: exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -1,24 +1,25 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import asyncio
from concurrent.futures import ThreadPoolExecutor
import signal
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from astra_msgs.msg import PtzControl, PtzFeedback
# Import the SIYI SDK
from core_pkg.siyi_sdk import (
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
AttitudeData,
CommandID,
DataStreamFrequency,
DataStreamType,
SingleAxis,
SiyiGimbalCamera,
)
@@ -262,7 +263,7 @@ class PtzNode(Node):
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
)
self.debug_pub.publish(msg)
self.get_logger().info(message_text)
self.get_logger().debug(message_text)
def run_async_func(self, coro):
"""Run an async function in the event loop."""