1 Commits

Author SHA1 Message Date
4d84b5f8cc added docker compose 2026-01-07 20:01:39 -06:00
8 changed files with 121 additions and 253 deletions

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source /opt/ros/humble/setup.bash
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source /opt/ros/humble/setup.bash
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source /opt/ros/humble/setup.bash
source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

18
docker-compose.yml Normal file
View File

@@ -0,0 +1,18 @@
services:
ros2:
image: osrf/ros:jazzy-desktop-full
container_name: ros2_jazzy_gui
network_mode: host
environment:
- DISPLAY=${DISPLAY}
- QT_X11_NO_MITSHM=1
- XAUTHORITY=/tmp/.docker.xauth
- NVIDIA_VISIBLE_DEVICES=all
- NVIDIA_DRIVER_CAPABILITIES=all
volumes:
- /tmp/.X11-unix:/tmp/.X11-unix:rw
- ./ros_ws:/ros_ws
stdin_open: true
tty: true
privileged: true

16
flake.lock generated
View File

@@ -24,27 +24,27 @@
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
"lastModified": 1761810010,
"narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a",
"rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"ref": "master",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"lastModified": 1744849697,
"narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=",
"owner": "lopsided98",
"repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4",
"type": "github"
},
"original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor";
inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
};
@@ -56,12 +56,10 @@
control-msgs
control-toolbox
moveit-core
moveit-planners
moveit-common
moveit-msgs
moveit-ros-planning
moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils
moveit-ros-move-group
moveit-servo
@@ -70,9 +68,9 @@
pilz-industrial-motion-planner
pick-ik
ompl
chomp-motion-planner
joy
ros2-controllers
chomp-motion-planner
# ros2-controllers nixpkg does not build :(
];
}
)

View File

@@ -1,6 +1,5 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty
import signal
@@ -8,7 +7,6 @@ import time
import atexit
import serial
import serial.tools.list_ports
import os
import sys
import threading
@@ -17,156 +15,70 @@ import glob
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
serial_pub = None
thread = None
class Anchor(Node):
"""
Publishers:
* /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging
* /anchor/from_vic/core
- VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
"""
Publishers:
* /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging
* /anchor/from_vic/core
- VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
##################################################
# Serial MCU Discovery
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
self.port = None
if port_override := os.getenv("PORT_OVERRIDE"):
self.port = port_override
ports = SerialRelay.list_serial_ports()
for i in range(4):
if self.port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit
self.ser = serial.Serial(self.port, 115200)
self.get_logger().info(f"Enabling Relay Mode")
self.ser.write(b"can_relay_mode,on\n")
atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup
# New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
@@ -203,10 +115,22 @@ class Anchor(Node):
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try:
output = str(self.serial_interface.readline(), "utf8")
output = str(self.ser.readline(), "utf8")
if output:
self.relay_fromvic(output)
@@ -232,20 +156,14 @@ class Anchor(Node):
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
if self.ser.is_open:
self.ser.close()
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
if self.ser.is_open:
self.ser.close()
exit(1)
except Exception as e:
print(f"Exception: {e}")
@@ -266,7 +184,7 @@ class Anchor(Node):
output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.serial_interface.write(bytes(output, "utf8"))
self.ser.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
@@ -328,7 +246,7 @@ class Anchor(Node):
"""Relay a raw string message to the MCU for debugging"""
message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}")
self.serial_interface.write(bytes(message, "utf8"))
self.ser.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
@@ -336,29 +254,28 @@ class Anchor(Node):
def cleanup(self):
print("Cleaning up before terminating...")
if self.serial_interface.is_open:
self.serial_interface.close()
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):
try:
rclpy.init(args=args)
anchor_node = Anchor()
rclpy.init(args=args)
sys.excepthook = myexcepthook
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True)
thread.start()
global serial_pub
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly

View File

@@ -1,6 +1,5 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
@@ -12,8 +11,6 @@ import os
import sys
import threading
import glob
import pwd
import grp
from math import copysign
from std_msgs.msg import String
@@ -74,36 +71,9 @@ class Headless(Node):
print("No gamepad found. Waiting...")
# Initialize the gamepad
id = 0
while True:
self.num_gamepads = pygame.joystick.get_count()
if id >= self.num_gamepads:
self.get_logger().fatal("Ran out of controllers to try")
sys.exit(1)
try:
self.gamepad = pygame.joystick.Joystick(id)
self.gamepad.init()
except Exception as e:
self.get_logger().error("Error when initializing gamepad")
self.get_logger().error(e)
id += 1
continue
print(f"Gamepad Found: {self.gamepad.get_name()}")
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5:
self.get_logger().error("Controller not correctly initialized.")
if not is_user_in_group("input"):
self.get_logger().warning(
"If using NixOS, you may need to add yourself to the 'input' group."
)
if is_user_in_group("plugdev"):
self.get_logger().warning(
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
)
else:
break
id += 1
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
self.create_timer(0.15, self.send_controls)
@@ -145,7 +115,7 @@ class Headless(Node):
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads:
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG)
@@ -326,34 +296,11 @@ def deadzone(value: float, threshold=0.05) -> float:
return value
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
username = os.getlogin()
# Get group ID from name
group_info = grp.getgrnam(group_name)
target_gid = group_info.gr_gid
# Get user's groups
user_info = pwd.getpwnam(username)
user_groups = os.getgrouplist(username, user_info.pw_gid)
return target_gid in user_groups
except KeyError:
return False
def main(args=None):
try:
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...")
finally:
rclpy.shutdown()
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":