15 Commits

Author SHA1 Message Date
ryleu
697efa7b9d add missing packages for moveit 2026-02-04 00:31:36 -05:00
ryleu
b70a0d27c3 uncomment ros2_controllers 2026-02-04 00:04:07 -05:00
ryleu
2d48361b8f update to develop branch of nix-ros-overlay 2026-02-03 23:32:42 -05:00
Riley M.
4a98c3d435 Merge pull request #25 from SHC-ASTRA/serial-refactor
Anchor Serial Refactor
2026-01-14 23:00:51 -06:00
SHC-ASTRA
b5be93e5f6 add an error instead of a crash when a gamepad fails to initialize 2026-01-14 19:49:33 -06:00
SHC-ASTRA
0e775c65c6 add trying multiple controllers to headless 2026-01-14 04:56:55 -06:00
SHC-ASTRA
14141651bf Merge branch 'autostart' into serial-refactor 2026-01-14 04:17:22 -06:00
ryleu
c10a2a5cca patch autostart scripts for nixos 2026-01-14 04:12:05 -05:00
David
df78575206 feat: (headless) add Ctrl+C try-except 2025-12-13 16:23:42 -06:00
David
40fa0d0ab8 style: (anchor) better comment serial finding 2025-11-21 17:06:37 -06:00
David
3bb3771dce fix: (anchor) ignore UnicodeDecodeError when getting mcu name 2025-11-11 13:18:36 -06:00
David
5e7776631d feat: (anchor) add new Serial finder code
Uses vendor and product ids to find a microcontroller, and detects its name after connecting. Upon failure, falls back to Areeb's code--just in case.
Also renamed `self.ser` to `self.serial_interface` and `self.port` to `self.serial_port` for clarity.
2025-11-10 23:24:14 -06:00
David
b84ca6757d refactor: (anchor) cleanup structural ros2 code 2025-11-10 22:45:43 -06:00
David
96f5eda005 feat: (headless) detect incorrectly connected controller 2025-11-10 22:02:49 -06:00
David
4c1416851e style: move pub/sub docs comment, rename SerialPub to Anchor 2025-11-10 21:58:03 -06:00
7 changed files with 253 additions and 103 deletions

View File

@@ -15,7 +15,11 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,7 +15,11 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,7 +17,11 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

16
flake.lock generated
View File

@@ -24,27 +24,27 @@
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": { "locked": {
"lastModified": 1761810010, "lastModified": 1770108954,
"narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=", "narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
"owner": "lopsided98", "owner": "lopsided98",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab", "rev": "3d05d46451b376e128a1553e78b8870c75d7753a",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "lopsided98", "owner": "lopsided98",
"ref": "master", "ref": "develop",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"type": "github" "type": "github"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1744849697, "lastModified": 1759381078,
"narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=", "narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "lopsided98", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4", "rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor"; description = "Development environment for ASTRA Anchor";
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
}; };
@@ -56,10 +56,12 @@
control-msgs control-msgs
control-toolbox control-toolbox
moveit-core moveit-core
moveit-planners
moveit-common moveit-common
moveit-msgs moveit-msgs
moveit-ros-planning moveit-ros-planning
moveit-ros-planning-interface moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils moveit-configs-utils
moveit-ros-move-group moveit-ros-move-group
moveit-servo moveit-servo
@@ -68,9 +70,9 @@
pilz-industrial-motion-planner pilz-industrial-motion-planner
pick-ik pick-ik
ompl ompl
chomp-motion-planner
joy joy
# ros2-controllers nixpkg does not build :( ros2-controllers
chomp-motion-planner
]; ];
} }
) )

View File

@@ -1,5 +1,6 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty from std_srvs.srv import Empty
import signal import signal
@@ -7,6 +8,7 @@ import time
import atexit import atexit
import serial import serial
import serial.tools.list_ports
import os import os
import sys import sys
import threading import threading
@@ -15,70 +17,156 @@ import glob
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
serial_pub = None KNOWN_USBS = [
thread = None (0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
""" class Anchor(Node):
Publishers: """
* /anchor/from_vic/debug Publishers:
- Every string received from the MCU is published here for debugging * /anchor/from_vic/debug
* /anchor/from_vic/core - Every string received from the MCU is published here for debugging
- VicCAN messages for Core node * /anchor/from_vic/core
* /anchor/from_vic/arm - VicCAN messages for Core node
- VicCAN messages for Arm node * /anchor/from_vic/arm
* /anchor/from_vic/bio - VicCAN messages for Arm node
- VicCAN messages for Bio node * /anchor/from_vic/bio
- VicCAN messages for Bio node
Subscribers: Subscribers:
* /anchor/from_vic/mock_mcu * /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU - For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay * /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU - Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string * /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging - Publish raw strings to this topic to send directly to the MCU for debugging
""" """
class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher' super().__init__("anchor_node") # previously 'serial_publisher'
# Loop through all serial devices on the computer to check for the MCU self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
self.port = None
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"): if port_override := os.getenv("PORT_OVERRIDE"):
self.port = port_override self.serial_port = port_override
ports = SerialRelay.list_serial_ports()
for i in range(4):
if self.port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU ##################################################
if b"pong" in response: # Serial MCU Discovery
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is None: # If there was not a port override, look for a MCU over USB for Serial.
self.get_logger().info("Unable to find MCU...") if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1) time.sleep(1)
sys.exit(1) sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
self.ser = serial.Serial(self.port, 115200) # Attempt to get name of connected MCU
self.get_logger().info(f"Enabling Relay Mode") self.serial_interface.write(
self.ser.write(b"can_relay_mode,on\n") b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit
atexit.register(self.cleanup) atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup
# New pub/sub with VicCAN # New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher( self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20 String, "/anchor/from_vic/debug", 20
@@ -115,22 +203,10 @@ class SerialRelay(Node):
String, "/anchor/relay", self.on_relay_tovic_string, 10 String, "/anchor/relay", self.on_relay_tovic_string, 10
) )
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" """Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try: try:
output = str(self.ser.readline(), "utf8") output = str(self.serial_interface.readline(), "utf8")
if output: if output:
self.relay_fromvic(output) self.relay_fromvic(output)
@@ -156,14 +232,20 @@ class SerialRelay(Node):
except serial.SerialException as e: except serial.SerialException as e:
print(f"SerialException: {e}") print(f"SerialException: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: try:
self.ser.close() if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: try:
self.ser.close() if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
@@ -184,7 +266,7 @@ class SerialRelay(Node):
output += f",{round(num, 7)}" # limit to 7 decimal places output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n" output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}") # self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.ser.write(bytes(output, "utf8")) self.serial_interface.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str): def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic""" """Relay a string message from the MCU to the appropriate VicCAN topic"""
@@ -246,7 +328,7 @@ class SerialRelay(Node):
"""Relay a raw string message to the MCU for debugging""" """Relay a raw string message to the MCU for debugging"""
message = msg.data message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}") # self.get_logger().info(f"Sending command to MCU: {msg}")
self.ser.write(bytes(message, "utf8")) self.serial_interface.write(bytes(message, "utf8"))
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():
@@ -254,28 +336,29 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") print("Cleaning up before terminating...")
if self.ser.is_open: if self.serial_interface.is_open:
self.ser.close() self.serial_interface.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) try:
sys.excepthook = myexcepthook rclpy.init(args=args)
anchor_node = Anchor()
global serial_pub thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True)
thread.start()
serial_pub = SerialRelay() rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
serial_pub.run() while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal( signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0) signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly ) # Catch termination signals and exit cleanly

View File

@@ -1,5 +1,6 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos from rclpy import qos
from rclpy.duration import Duration from rclpy.duration import Duration
@@ -11,6 +12,8 @@ import os
import sys import sys
import threading import threading
import glob import glob
import pwd
import grp
from math import copysign from math import copysign
from std_msgs.msg import String from std_msgs.msg import String
@@ -71,9 +74,36 @@ class Headless(Node):
print("No gamepad found. Waiting...") print("No gamepad found. Waiting...")
# Initialize the gamepad # Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0) id = 0
self.gamepad.init() while True:
print(f"Gamepad Found: {self.gamepad.get_name()}") self.num_gamepads = pygame.joystick.get_count()
if id >= self.num_gamepads:
self.get_logger().fatal("Ran out of controllers to try")
sys.exit(1)
try:
self.gamepad = pygame.joystick.Joystick(id)
self.gamepad.init()
except Exception as e:
self.get_logger().error("Error when initializing gamepad")
self.get_logger().error(e)
id += 1
continue
print(f"Gamepad Found: {self.gamepad.get_name()}")
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5:
self.get_logger().error("Controller not correctly initialized.")
if not is_user_in_group("input"):
self.get_logger().warning(
"If using NixOS, you may need to add yourself to the 'input' group."
)
if is_user_in_group("plugdev"):
self.get_logger().warning(
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
)
else:
break
id += 1
self.create_timer(0.15, self.send_controls) self.create_timer(0.15, self.send_controls)
@@ -115,7 +145,7 @@ class Headless(Node):
sys.exit(0) sys.exit(0)
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() == 0: if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Send one last zero control message # Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG) self.core_publisher.publish(CORE_STOP_MSG)
@@ -296,11 +326,34 @@ def deadzone(value: float, threshold=0.05) -> float:
return value return value
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
username = os.getlogin()
# Get group ID from name
group_info = grp.getgrnam(group_name)
target_gid = group_info.gr_gid
# Get user's groups
user_info = pwd.getpwnam(username)
user_groups = os.getgrouplist(username, user_info.pw_gid)
return target_gid in user_groups
except KeyError:
return False
def main(args=None): def main(args=None):
rclpy.init(args=args) try:
node = Headless() rclpy.init(args=args)
rclpy.spin(node)
rclpy.shutdown() node = Headless()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...")
finally:
rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":