27 Commits

Author SHA1 Message Date
ryleu
b388275bba clean stuff up a bit to prep for CAN 2026-02-15 17:23:18 -06:00
ryleu
5c0194c543 remove KNOWN_USBS from anchor_node.py 2026-02-15 01:05:37 -06:00
ryleu
809ca71208 remove nested for loops 2026-02-15 01:04:43 -06:00
SHC-ASTRA
225700bb86 tested it on testbed and had to change things 2026-02-15 00:47:20 -06:00
ryleu
4459886fc1 add a mock mode and fix a logic error 2026-02-14 23:16:34 -06:00
ryleu
18fce2c19b worth a shot to see if it works 2026-02-14 21:39:32 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
David Sharpe
65aab7f179 Fix nix cache (#27, fix-cache)
Fix cache
2026-02-04 02:34:16 -06:00
ryleu
697efa7b9d add missing packages for moveit 2026-02-04 00:31:36 -05:00
ryleu
b70a0d27c3 uncomment ros2_controllers 2026-02-04 00:04:07 -05:00
ryleu
2d48361b8f update to develop branch of nix-ros-overlay 2026-02-03 23:32:42 -05:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
Riley M.
4a98c3d435 Merge pull request #25 from SHC-ASTRA/serial-refactor
Anchor Serial Refactor
2026-01-14 23:00:51 -06:00
SHC-ASTRA
b5be93e5f6 add an error instead of a crash when a gamepad fails to initialize 2026-01-14 19:49:33 -06:00
SHC-ASTRA
0e775c65c6 add trying multiple controllers to headless 2026-01-14 04:56:55 -06:00
SHC-ASTRA
14141651bf Merge branch 'autostart' into serial-refactor 2026-01-14 04:17:22 -06:00
ryleu
c10a2a5cca patch autostart scripts for nixos 2026-01-14 04:12:05 -05:00
David
df78575206 feat: (headless) add Ctrl+C try-except 2025-12-13 16:23:42 -06:00
David
40fa0d0ab8 style: (anchor) better comment serial finding 2025-11-21 17:06:37 -06:00
David
3bb3771dce fix: (anchor) ignore UnicodeDecodeError when getting mcu name 2025-11-11 13:18:36 -06:00
David
5e7776631d feat: (anchor) add new Serial finder code
Uses vendor and product ids to find a microcontroller, and detects its name after connecting. Upon failure, falls back to Areeb's code--just in case.
Also renamed `self.ser` to `self.serial_interface` and `self.port` to `self.serial_port` for clarity.
2025-11-10 23:24:14 -06:00
David
b84ca6757d refactor: (anchor) cleanup structural ros2 code 2025-11-10 22:45:43 -06:00
David
96f5eda005 feat: (headless) detect incorrectly connected controller 2025-11-10 22:02:49 -06:00
David
4c1416851e style: move pub/sub docs comment, rename SerialPub to Anchor 2025-11-10 21:58:03 -06:00
15 changed files with 458 additions and 284 deletions

1
.envrc
View File

@@ -1 +1,2 @@
use flake
[[ -d install ]] && source install/setup.$(echo $0 | grep -oE '[^/]+$')

View File

@@ -15,7 +15,11 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
source /opt/ros/humble/setup.bash
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,7 +15,11 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
source /opt/ros/humble/setup.bash
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,7 +17,11 @@ done
echo "[INFO] Network interface is up!"
source /opt/ros/humble/setup.bash
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

View File

@@ -1,18 +0,0 @@
services:
ros2:
image: osrf/ros:jazzy-desktop-full
container_name: ros2_jazzy_gui
network_mode: host
environment:
- DISPLAY=${DISPLAY}
- QT_X11_NO_MITSHM=1
- XAUTHORITY=/tmp/.docker.xauth
- NVIDIA_VISIBLE_DEVICES=all
- NVIDIA_DRIVER_CAPABILITIES=all
volumes:
- /tmp/.X11-unix:/tmp/.X11-unix:rw
- ./ros_ws:/ros_ws
stdin_open: true
tty: true
privileged: true

16
flake.lock generated
View File

@@ -24,27 +24,27 @@
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1761810010,
"narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "master",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1744849697,
"narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=",
"owner": "lopsided98",
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor";
inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
};
@@ -25,9 +25,10 @@
name = "ASTRA Anchor";
packages = with pkgs; [
colcon
(python312.withPackages (
(python313.withPackages (
p: with p; [
pyserial
python-can
pygame
scipy
crccheck
@@ -56,10 +57,12 @@
control-msgs
control-toolbox
moveit-core
moveit-planners
moveit-common
moveit-msgs
moveit-ros-planning
moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils
moveit-ros-move-group
moveit-servo
@@ -68,9 +71,9 @@
pilz-industrial-motion-planner
pick-ik
ompl
chomp-motion-planner
joy
# ros2-controllers nixpkg does not build :(
ros2-controllers
chomp-motion-planner
];
}
)

View File

@@ -1,85 +1,99 @@
import rclpy
from rclpy.node import Node
from std_srvs.srv import Empty
from rclpy.executors import ExternalShutdownException
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal
import time
import atexit
import serial
import os
from .connector import (
Connector,
MockConnector,
SerialConnector,
CANConnector,
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import sys
import threading
import glob
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
serial_pub = None
thread = None
class Anchor(Node):
"""
Publishers:
* /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging
* /anchor/from_vic/core
- VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
"""
Publishers:
* /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging
* /anchor/from_vic/core
- VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
"""
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
connector: Connector
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
super().__init__("anchor_node")
# Loop through all serial devices on the computer to check for the MCU
self.port = None
if port_override := os.getenv("PORT_OVERRIDE"):
self.port = port_override
ports = SerialRelay.list_serial_ports()
for i in range(4):
if self.port is not None:
break
for port in ports:
logger = self.get_logger()
self.declare_parameter(
"connector",
"auto",
ParameterDescriptor(
name="connector",
description="Declares which MCU connector should be used. Defaults to 'auto'.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.",
),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(self.get_logger())
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(self.get_logger())
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(self.get_logger())
case "auto":
logger.info("automatically determining connector")
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
logger.info("trying CAN connector")
self.connector = CANConnector(self.get_logger())
except (NoValidDeviceException, NoWorkingDeviceException, TypeError):
logger.info("CAN connector failed, trying serial connector")
self.connector = SerialConnector(self.get_logger())
case _:
self.get_logger().fatal(
f"invalid value for connector parameter: {connector_select}"
)
exit(1)
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
self.get_logger().info(f"Enabling Relay Mode")
self.ser.write(b"can_relay_mode,on\n")
# Close devices on exit
atexit.register(self.cleanup)
# New pub/sub with VicCAN
# ROS2 Topic Setup
# Publishers
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
)
@@ -93,189 +107,66 @@ class SerialRelay(Node):
VicCAN, "/anchor/from_vic/bio", 20
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.connector.write, 20
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20
)
self.tovic_debug_sub_ = self.create_subscription(
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20
)
# Create publishers
self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10)
self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
# Create a subscriber
self.relay_sub = self.create_subscription(
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def cleanup(self):
self.connector.cleanup()
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try:
output = str(self.ser.readline(), "utf8")
output = self.connector.read()
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
if not output:
return
self.relay_fromvic(output)
def relay_fromvic(self, msg: VicCAN):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
msg.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic")
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
self.fromvic_arm_pub_.publish(msg)
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
# self.get_logger().info(f"Got command from mock MCU: {msg}")
self.relay_fromvic(msg.data)
def on_relay_tovic_viccan(self, msg: VicCAN):
"""Relay a VicCAN message to the MCU"""
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
for num in msg.data:
output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.ser.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
self.fromvic_debug_pub_.publish(String(data=msg))
parts = msg.strip().split(",")
if len(parts) > 0 and parts[0] != "can_relay_fromvic":
self.get_logger().debug(f"Ignoring non-VicCAN message: '{msg.strip()}'")
return
# String validation
malformed: bool = False
malformed_reason: str = ""
if len(parts) < 3 or len(parts) > 7:
malformed = True
malformed_reason = (
f"invalid argument count (expected [3,7], got {len(parts)})"
)
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
malformed = True
malformed_reason = f"invalid mcu_name '{parts[1]}'"
elif not (parts[2].isnumeric()) or int(parts[2]) < 0:
malformed = True
malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer"
else:
for x in parts[3:]:
try:
float(x)
except ValueError:
malformed = True
malformed_reason = f"data '{x}' is not a float"
break
if malformed:
self.get_logger().warning(
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
)
return
# Have valid VicCAN message
output = VicCAN()
output.mcu_name = parts[1]
output.command_id = int(parts[2])
if len(parts) > 3:
output.data = [float(x) for x in parts[3:]]
output.header = Header(
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
viccan = string_to_viccan(
msg.data,
"mock",
self.get_logger(),
)
# self.get_logger().info(f"Relaying from MCU: {output}")
if output.mcu_name == "core":
self.fromvic_core_pub_.publish(output)
elif output.mcu_name == "arm" or output.mcu_name == "digit":
self.fromvic_arm_pub_.publish(output)
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
self.fromvic_bio_pub_.publish(output)
def on_relay_tovic_string(self, msg: String):
"""Relay a raw string message to the MCU for debugging"""
message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}")
self.ser.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
if viccan:
self.relay_fromvic(viccan)
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
try:
rclpy.init(args=args)
anchor_node = Anchor()
global serial_pub
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True)
thread.start()
serial_pub = SerialRelay()
serial_pub.run()
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly

View File

@@ -0,0 +1,189 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
# CAN
import can
import can.interfaces.socketcan
# Serial
import serial
import serial.tools.list_ports
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
BAUD_RATE = 115200
class NoValidDeviceException(Exception):
pass
class NoWorkingDeviceException(Exception):
pass
class MultipleValidDevicesException(Exception):
pass
class DeviceClosedException(Exception):
pass
class Connector(ABC):
logger: RcutilsLogger
def __init__(self, logger: RcutilsLogger):
self.logger = logger
@abstractmethod
def read(self) -> VicCAN | None:
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
def cleanup(self):
pass
class SerialConnector(Connector):
port: str
mcu_name: str
serial_interface: serial.Serial
override: bool
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
ports = self._find_ports()
if len(ports) <= 0:
raise NoValidDeviceException("no valid serial device found")
if (l := len(ports)) > 1:
raise MultipleValidDevicesException(
f"too many ({l}) valid serial devices found"
)
# check each of our ports to make sure one of them is responding
port = ports[0]
mcu_name = self._get_name(port)
if not mcu_name:
raise NoWorkingDeviceException(
f"found {port}, but it did not respond with its name"
)
self.port = port
self.mcu_name = mcu_name
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(b"can_relay_mode,on\n")
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> VicCAN | None:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return None
return string_to_viccan(raw, self.mcu_name, self.logger)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except TypeError as e:
self.logger.error(f"TypeError: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
pass # pretty much no other error matters
def write(self, msg: VicCAN):
# go from [ w, x, y, z ] -> "w,x,y,z" & round to 7 digits max
data = ",".join([str(round(x, 7)) for x in msg.data])
output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n"
self.serial_interface.write(bytes(output, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def read(self) -> VicCAN | None:
return None
def write(self, msg: VicCAN):
print(msg)

View File

@@ -0,0 +1,42 @@
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed.
Parameters:
* msg: str
- The message in serial VicCAN format
* mcu_name: str
- The name of the MCU (e.g. core, citadel, arm)
* logger: RcutilsLogger
- A logger retrieved from node.get_logger()
Returns:
* VicCAN | None
- The VicCAN message on a success or None on a failure
"""
parts: list[str] = msg.split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
logger.debug(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
logger.debug(
f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}"
)
return None
elif len(parts) > 7:
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
return VicCAN(
mcu_name=parts[1],
command_id=int(parts[2]),
data=[float(x) for x in parts[3:]],
)

View File

@@ -3,13 +3,14 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<description>Anchor -- ROS and CAN relay node</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend>

View File

@@ -49,7 +49,7 @@ class SerialRelay(Node):
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
ArmManual, "/arm/control/manual", self.send_manual, 2
)
# New messages
@@ -190,7 +190,7 @@ class SerialRelay(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n"

View File

@@ -31,14 +31,14 @@ CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
# history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
# Used to verify the length of an incoming VicCAN feedback message

View File

@@ -1,5 +1,6 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
@@ -11,6 +12,8 @@ import os
import sys
import threading
import glob
import pwd
import grp
from math import copysign
from std_msgs.msg import String
@@ -32,14 +35,14 @@ ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
# history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
CORE_MODE = "twist" # "twist" or "duty"
@@ -71,9 +74,36 @@ class Headless(Node):
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
id = 0
while True:
self.num_gamepads = pygame.joystick.get_count()
if id >= self.num_gamepads:
self.get_logger().fatal("Ran out of controllers to try")
sys.exit(1)
try:
self.gamepad = pygame.joystick.Joystick(id)
self.gamepad.init()
except Exception as e:
self.get_logger().error("Error when initializing gamepad")
self.get_logger().error(e)
id += 1
continue
print(f"Gamepad Found: {self.gamepad.get_name()}")
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5:
self.get_logger().error("Controller not correctly initialized.")
if not is_user_in_group("input"):
self.get_logger().warning(
"If using NixOS, you may need to add yourself to the 'input' group."
)
if is_user_in_group("plugdev"):
self.get_logger().warning(
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
)
else:
break
id += 1
self.create_timer(0.15, self.send_controls)
@@ -115,7 +145,7 @@ class Headless(Node):
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG)
@@ -296,11 +326,34 @@ def deadzone(value: float, threshold=0.05) -> float:
return value
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
username = os.getlogin()
# Get group ID from name
group_info = grp.getgrnam(group_name)
target_gid = group_info.gr_gid
# Get user's groups
user_info = pwd.getpwnam(username)
user_groups = os.getgrouplist(username, user_info.pw_gid)
return target_gid in user_groups
except KeyError:
return False
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
try:
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...")
finally:
rclpy.shutdown()
if __name__ == "__main__":