50 Commits

Author SHA1 Message Date
David
87237bd841 chore(anchor): use new version commit hash field name 2026-04-18 15:09:30 -05:00
David
79b2d0020f feat(anchor): add is_main and is_dirty to versioning 2026-04-15 23:50:47 -05:00
David
3dd9525833 feat(anchor): add MCU Versioning feedback 2026-04-14 15:38:41 -05:00
Riley M.
8404999369 Merge pull request #31 from SHC-ASTRA/can-refactor
Refactor anchor & add direct CAN connector
2026-04-08 00:18:13 -05:00
ryleu
88574524cf clarify the mock connector usage in the README 2026-04-08 00:15:39 -05:00
ryleu
30bb32a66b remove extraneous slice 2026-04-08 00:09:04 -05:00
David
010d2da0b6 fix: string number 2026-04-07 23:45:40 -05:00
ryleu
0a257abf43 make the pad 3 -> logic consistent 2026-04-07 23:44:38 -05:00
ryleu
b09b55bee0 fix bug because apparently python has arrays 2026-04-07 22:19:55 -05:00
ryleu
ec7f272934 clean up code 2026-04-07 22:16:08 -05:00
ryleu
bc9183d59a make mock mcu use VicCAN messages 2026-04-07 21:52:52 -05:00
ryleu
410d3706ed update README with mock connector instructions 2026-04-02 19:49:07 -05:00
ryleu
89b3194914 update documentation and accept 3-value VicCAN messages 2026-04-02 19:43:10 -05:00
ryleu
4ef226c094 nix fmt 2026-04-01 03:31:21 -05:00
SHC-ASTRA
327539467c fixed can connector 2026-04-01 02:50:12 -05:00
SHC-ASTRA
e570d371c6 fix a plethora of bugs related to the serial connector 2026-04-01 01:48:40 -05:00
ryleu
f7efa604d2 finish adding parameters 2026-03-23 20:39:50 -05:00
ryleu
fe46a2ab4d fix wrong order for initialization 2026-03-23 13:25:13 -05:00
ryleu
941e196316 implement review comments 2026-03-21 18:14:44 -05:00
ryleu
7a3c4af1ce remove .envrc sourcing of install 2026-03-18 23:28:49 -05:00
ryleu
5e5a52438d black fmt 2026-03-18 23:22:42 -05:00
ryleu
c814f34ca6 rewrite the launch file 2026-03-18 00:12:42 -05:00
ryleu
ce39d0aeb9 Merge remote-tracking branch 'origin/main' into can-refactor 2026-03-17 23:57:38 -05:00
ryleu
9b96244a1b add can support 2026-03-17 23:52:37 -05:00
David Sharpe
e588ff0a7b Arm topic refactor (#33)
Headless too
2026-03-17 23:34:35 -05:00
David Sharpe
e83642cfe8 style: (arm) fix comment and variable name 2026-03-17 23:07:06 -05:00
David Sharpe
67b3c5bc8f refactor: (arm) consolidate velocity control VicCAN into single function 2026-03-17 01:44:32 -05:00
David Sharpe
c506a34b37 style: (arm) format 2026-03-17 01:43:22 -05:00
David Sharpe
980c08ba4f refactor: implement Riley's comments
- Add @warnings.deprecated decorators to callbacks that use old topics
- Rename "*new*" topics
- Print debug on malformed message receival
- Un-invert Axes 2 & 3 in the URDF
- Don't silently check mcu_name twice
- Remove threading.Thread (not one of Riley's comments)
- Add a real signal handler (ditto)
- Move stamped stop messages to helper functions
2026-03-17 01:14:18 -05:00
David Sharpe
743744edaa refactor: (headless) change string parameters to bool 2026-03-16 00:39:22 -05:00
David Sharpe
292b3a742d refactor: (headless) cleanup mainly arm 2026-03-16 00:26:35 -05:00
David Sharpe
62fd1b110d refactor: remedy QoS profiles 2026-03-08 03:57:28 -05:00
David Sharpe
aaf40124fa Merge branch 'main' into arm-topic-refactor 2026-03-08 03:56:25 -05:00
David Sharpe
294ae393de feat: (headless) add new arm manual (JointJog) 2026-03-08 03:29:20 -05:00
David Sharpe
9c9d3d675e refactor: (headless) reorganize send_controls() into different functions 2026-03-08 01:08:17 -06:00
David Sharpe
6fa47021fc refactor: (headless) reorganize __init__(), add use_old_topics parameter 2026-03-08 00:42:41 -06:00
David Sharpe
f23d8c62ff feat: (arm) add use_old_topics parameter 2026-03-08 00:19:06 -06:00
David Sharpe
667247cac8 refactor: (arm) reorganize __init__() and remove run() 2026-03-08 00:10:33 -06:00
David Sharpe
0929cc9503 feat: (arm) add JointJog for manual arm input 2026-03-07 23:55:33 -06:00
ryleu
b388275bba clean stuff up a bit to prep for CAN 2026-02-15 17:23:18 -06:00
ryleu
5c0194c543 remove KNOWN_USBS from anchor_node.py 2026-02-15 01:05:37 -06:00
ryleu
809ca71208 remove nested for loops 2026-02-15 01:04:43 -06:00
SHC-ASTRA
225700bb86 tested it on testbed and had to change things 2026-02-15 00:47:20 -06:00
ryleu
4459886fc1 add a mock mode and fix a logic error 2026-02-14 23:16:34 -06:00
ryleu
18fce2c19b worth a shot to see if it works 2026-02-14 21:39:32 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
17 changed files with 1447 additions and 998 deletions

View File

@@ -60,6 +60,33 @@ $ ros2 launch anchor_pkg rover.launch.py # Must be run on a computer connected
$ ros2 run headless_pkg headless_full # Optionally run in a separate shell on the same or different computer.
```
### Using the Mock Connector
Anchor provides a mock connector meant for testing and scripting purposes. You can select the mock connector by running anchor with this command:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:="mock"
```
To see all data that would be sent over the CAN network (and thus to the microcontrollers), use this command:
```bash
$ ros2 topic echo /anchor/to_vic/debug
```
To send data to the mock connector (as if you were a ROS2 node), use the normal relay topic:
```bash
$ ros2 topic pub /anchor/to_vic/relay astra_msgs/msg/VicCAN '{mcu_name: "core", command_id: 50, data: [0.0, 2.0, 0.0, 1.0]}'
```
To send data to the mock connector (as if you were a microcontroller), publish to the dedicated topic:
```bash
$ ros2 topic pub /anchor/from_vic/mock_mcu astra_msgs/msg/VicCAN '{mcu_name: "arm", command_id: 55, data: [0.0, 450.0, 900.0, 0.0]}'
```
### Testing Serial
You can fake the presence of a Serial device (i.e., MCU) by using the following command:
@@ -68,10 +95,31 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
```
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so:
When you go to run anchor, use the `serial_override` ROS2 parameter to point it to the fake serial port, like so:
```bash
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py
$ ros2 launch anchor_pkg rover.launch.py connector:=serial serial_override:=/tmp/ttyACM9
```
### Testing CAN
You can create a virtual CAN network by using the following commands to create and then enable it:
```bash
sudo ip link add dev vcan0 type vcan
sudo ip link set vcan0 up
```
When you go to run anchor, use the `can_override` ROS2 parameter to point it to the virtual CAN network, like so:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:=can can_override:=vcan0
```
Once you're done, you should delete the virtual network so that anchor doesn't get confused if you plug in a real CAN adapter:
```bash
$ sudo ip link delete vcan0
```
### Connecting the GuliKit Controller

23
flake.lock generated
View File

@@ -60,7 +60,8 @@
"nixpkgs": [
"nix-ros-overlay",
"nixpkgs"
]
],
"treefmt-nix": "treefmt-nix"
}
},
"systems": {
@@ -77,6 +78,26 @@
"repo": "default",
"type": "github"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1773297127,
"narHash": "sha256-6E/yhXP7Oy/NbXtf1ktzmU8SdVqJQ09HC/48ebEGBpk=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "71b125cd05fbfd78cab3e070b73544abe24c5016",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",

View File

@@ -4,6 +4,11 @@
inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
treefmt-nix = {
url = "github:numtide/treefmt-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs =
@@ -11,7 +16,8 @@
self,
nix-ros-overlay,
nixpkgs,
}:
...
}@inputs:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system:
let
@@ -25,9 +31,10 @@
name = "ASTRA Anchor";
packages = with pkgs; [
colcon
(python312.withPackages (
(python313.withPackages (
p: with p; [
pyserial
python-can
pygame
scipy
crccheck
@@ -61,7 +68,7 @@
moveit-msgs
moveit-ros-planning
moveit-ros-planning-interface
moveit-ros-visualization
moveit-ros-visualization
moveit-configs-utils
moveit-ros-move-group
moveit-servo
@@ -72,7 +79,7 @@
ompl
joy
ros2-controllers
chomp-motion-planner
chomp-motion-planner
];
}
)
@@ -83,6 +90,8 @@
export QT_X11_NO_MITSHM=1
'';
};
formatter = (inputs.treefmt-nix.lib.evalModule pkgs ./treefmt.nix).config.build.wrapper;
}
);

View File

@@ -1,28 +1,26 @@
from warnings import deprecated
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal
import time
import atexit
import serial
import serial.tools.list_ports
import os
import sys
from .connector import (
Connector,
MockConnector,
SerialConnector,
CANConnector,
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import threading
import glob
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
from builtin_interfaces.msg import Time
from std_msgs.msg import String
from astra_msgs.msg import VicCAN, McuVersion
class Anchor(Node):
@@ -36,308 +34,234 @@ class Anchor(Node):
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
* /anchor/to_vic/debug
- A string copy of the messages published to ./relay are published here
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
- For testing without an actual MCU, publish ViCAN messages here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
- Send raw strings to connectors. Does not work for connectors that require conversion (like CANConnector)
* /anchor/relay
- Legacy method for talking to connectors. Takes String as input, but does not send the raw strings to connectors.
Instead, it converts them to VicCAN messages first.
"""
ASTRA_EPOCH = time.struct_time((2022, 1, 1, 0, 0, 0, 0, 0, 0)) # January 1, 2022
connector: Connector
def __init__(self):
# Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
super().__init__("anchor_node")
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
logger = self.get_logger()
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
# ROS2 Parameter Setup
##################################################
# Serial MCU Discovery
self.declare_parameter(
"connector",
"auto",
ParameterDescriptor(
name="connector",
description="Declares which MCU connector should be used. Defaults to 'auto'.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.",
),
)
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
self.declare_parameter(
"can_override",
"",
ParameterDescriptor(
name="can_override",
description="Overrides which CAN channel will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid CAN network that shows up in `ip link show`.",
),
)
self.declare_parameter(
"serial_override",
"",
ParameterDescriptor(
name="serial_override",
description="Overrides which serial port will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid path to a serial device file that shows up in `ls /dev/tty*`.",
),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
can_override = (
self.get_parameter("can_override").get_parameter_value().string_value
)
serial_override = (
self.get_parameter("serial_override").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(
logger, self.get_clock(), serial_override
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(logger, self.get_clock(), can_override)
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(logger, self.get_clock())
case "auto":
logger.info("automatically determining connector")
try:
logger.info("trying CAN connector")
self.connector = CANConnector(
logger, self.get_clock(), can_override
)
except (NoValidDeviceException, NoWorkingDeviceException, TypeError):
logger.info("CAN connector failed, trying serial connector")
self.connector = SerialConnector(
logger, self.get_clock(), serial_override
)
case _:
logger.fatal(
f"invalid value for connector parameter: {connector_select}"
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
exit(1)
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit
atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup
# New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20
# Publishers
self.fromvic_debug_pub_ = self.create_publisher( # only used by serial
String,
"/anchor/from_vic/debug",
20,
)
self.fromvic_core_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/core", 20
VicCAN,
"/anchor/from_vic/core",
20,
)
self.fromvic_arm_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/arm", 20
VicCAN,
"/anchor/from_vic/arm",
20,
)
self.fromvic_bio_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/bio", 20
VicCAN,
"/anchor/from_vic/bio",
20,
)
# Debug publisher
self.tovic_debug_pub_ = self.create_publisher(
VicCAN,
"/anchor/to_vic/debug",
20,
)
# MCU Version publisher
self.mcu_version_pub_ = self.create_publisher(
McuVersion,
"/anchor/from_vic/mcu_version",
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20
VicCAN,
"/anchor/to_vic/relay",
self.write_connector,
20,
)
self.tovic_debug_sub_ = self.create_subscription(
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20
self.tovic_sub_legacy_ = self.create_subscription(
String,
"/anchor/relay",
self.write_connector_legacy,
20,
)
self.mock_mcu_sub_ = self.create_subscription(
VicCAN,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
)
self.tovic_string_sub_ = self.create_subscription(
String,
"/anchor/to_vic/relay_string",
self.connector.write_raw,
20,
)
# Create publishers
self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10)
self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
self.mcu_versions: dict[str, McuVersion] = {}
self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
# Create a subscriber
self.relay_sub = self.create_subscription(
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try:
output = str(self.serial_interface.readline(), "utf8")
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
# self.get_logger().info(f"Got command from mock MCU: {msg}")
self.relay_fromvic(msg.data)
def on_relay_tovic_viccan(self, msg: VicCAN):
"""Relay a VicCAN message to the MCU"""
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
for num in msg.data:
output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.serial_interface.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
self.fromvic_debug_pub_.publish(String(data=msg))
parts = msg.strip().split(",")
if len(parts) > 0 and parts[0] != "can_relay_fromvic":
self.get_logger().debug(f"Ignoring non-VicCAN message: '{msg.strip()}'")
return
# String validation
malformed: bool = False
malformed_reason: str = ""
if len(parts) < 3 or len(parts) > 7:
malformed = True
malformed_reason = (
f"invalid argument count (expected [3,7], got {len(parts)})"
)
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
malformed = True
malformed_reason = f"invalid mcu_name '{parts[1]}'"
elif not (parts[2].isnumeric()) or int(parts[2]) < 0:
malformed = True
malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer"
else:
for x in parts[3:]:
try:
float(x)
except ValueError:
malformed = True
malformed_reason = f"data '{x}' is not a float"
break
if malformed:
self.get_logger().warning(
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
)
return
# Have valid VicCAN message
output = VicCAN()
output.mcu_name = parts[1]
output.command_id = int(parts[2])
if len(parts) > 3:
output.data = [float(x) for x in parts[3:]]
output.header = Header(
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
)
# self.get_logger().info(f"Relaying from MCU: {output}")
if output.mcu_name == "core":
self.fromvic_core_pub_.publish(output)
elif output.mcu_name == "arm" or output.mcu_name == "digit":
self.fromvic_arm_pub_.publish(output)
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
self.fromvic_bio_pub_.publish(output)
def on_relay_tovic_string(self, msg: String):
"""Relay a raw string message to the MCU for debugging"""
message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}")
self.serial_interface.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# Close devices on exit
atexit.register(self.cleanup)
def cleanup(self):
print("Cleaning up before terminating...")
if self.serial_interface.is_open:
self.serial_interface.close()
self.connector.cleanup()
def read_connector(self):
"""Check the connector for new data from the MCU, and publish string to appropriate topics"""
viccan, raw = self.connector.read()
if raw:
self.fromvic_debug_pub_.publish(String(data=raw))
if viccan:
self.relay_fromvic(viccan)
def write_connector(self, msg: VicCAN):
"""Write to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write(msg)
self.tovic_debug_pub_.publish(msg)
@deprecated(
"Use /anchor/to_vic/relay or /anchor/to_vic/relay_string instead of /anchor/relay"
)
def write_connector_legacy(self, msg: String):
"""Write to the connector by first attempting to convert String to VicCAN"""
# please do not reference this code. ~riley
for cmd in msg.data.split("\n"):
viccan = string_to_viccan(
cmd,
"anchor",
self.get_logger(),
self.get_clock().now().to_msg(),
)
if viccan:
self.write_connector(viccan)
def relay_fromvic(self, msg: VicCAN):
"""Relay a message from the MCU to the appropriate VicCAN topic"""
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
self.fromvic_arm_pub_.publish(msg)
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
# MCU Versioning information
if msg.command_id in (46, 47) and msg.mcu_name not in self.mcu_versions:
self.mcu_versions[msg.mcu_name] = McuVersion(mcu_name=msg.mcu_name)
if msg.command_id == 46: # commit hashes
self.mcu_versions[msg.mcu_name].project_commit_fragment = msg.data[0]
self.mcu_versions[msg.mcu_name].astra_lib_commit_fragment = msg.data[1]
elif msg.command_id == 47: # build timestamp and version numbers
version_msg = self.mcu_versions[msg.mcu_name]
version_msg.build_time = Time(
sec=int(time.mktime(self.ASTRA_EPOCH) + msg.data[0])
)
# is_main and is_dirty is in msg.data[1]
# Out of 1 byte, it looks like [lib_isdirty][lib_ismain][proj_isdirty][proj_ismain]
version_msg.astra_lib_is_dirty = bool(int(msg.data[1]) >> 3 & 0x1)
version_msg.astra_lib_is_main = bool(int(msg.data[1]) >> 2 & 0x1)
version_msg.project_is_dirty = bool(int(msg.data[1]) >> 1 & 0x1)
version_msg.project_is_main = bool(int(msg.data[1]) & 0x1)
self.mcu_version_pub_.publish(version_msg)
def main(args=None):
@@ -350,16 +274,9 @@ def main(args=None):
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates
anchor_node.read_connector() # Check the connector for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -0,0 +1,438 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.clock import Clock
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan as _string_to_viccan, viccan_to_string
# CAN
import can
import can.interfaces.socketcan
import struct
# Serial
import serial
import serial.tools.list_ports
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
BAUD_RATE = 115200
MCU_IDS = [
"broadcast",
"core",
"arm",
"digit",
"faerie",
"citadel",
"libs",
]
class NoValidDeviceException(Exception):
pass
class NoWorkingDeviceException(Exception):
pass
class MultipleValidDevicesException(Exception):
pass
class DeviceClosedException(Exception):
pass
class Connector(ABC):
logger: RcutilsLogger
clock: Clock
def string_to_viccan(self, msg: str, mcu_name: str):
"""function currying so that we do not need to pass logger and clock every time"""
return _string_to_viccan(
msg,
mcu_name,
self.logger,
self.clock.now().to_msg(),
)
def __init__(self, logger: RcutilsLogger, clock: Clock):
self.logger = logger
self.clock = clock
@abstractmethod
def read(self) -> tuple[VicCAN | None, str | None]:
"""
Must return a tuple of (VicCAN, debug message or string repr of VicCAN)
"""
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
@abstractmethod
def write_raw(self, msg: str):
pass
def cleanup(self):
pass
class SerialConnector(Connector):
port: str
mcu_name: str
serial_interface: serial.Serial
def __init__(self, logger: RcutilsLogger, clock: Clock, serial_override: str = ""):
super().__init__(logger, clock)
ports = self._find_ports()
mcu_name: str | None = None
if serial_override:
logger.warn(
f"using serial_override: `{serial_override}`! this will bypass several checks."
)
ports = [serial_override]
mcu_name = "override"
if len(ports) <= 0:
raise NoValidDeviceException("no valid serial device found")
if (l := len(ports)) > 1:
raise MultipleValidDevicesException(
f"too many ({l}) valid serial devices found"
)
# check each of our ports to make sure one of them is responding
port = ports[0]
# we might already have a name by now if we overrode earlier
mcu_name = mcu_name or self._get_name(port)
if not mcu_name:
raise NoWorkingDeviceException(
f"found {port}, but it did not respond with its name"
)
self.port = port
self.mcu_name = mcu_name
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(b"can_relay_mode,on\n")
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
# turn relay mode off if it failed to respond with its name
serial_interface.write(b"can_relay_mode,off\n")
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> tuple[VicCAN | None, str | None]:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return (None, None)
return (
self.string_to_viccan(raw, self.mcu_name),
raw,
)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
return (None, None) # pretty much no other error matters
def write(self, msg: VicCAN):
self.write_raw(viccan_to_string(msg))
def write_raw(self, msg: str):
self.serial_interface.write(bytes(msg, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger, clock: Clock, can_override: str):
super().__init__(logger, clock)
self.can_channel: str | None = None
self.can_bus: can.BusABC | None = None
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
# filter to busses whose channel matches the can_override
if can_override:
self.logger.info(f"overrode can interface with {can_override}")
avail = list(
filter(
lambda b: b.get("channel") == can_override,
avail,
)
)
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
try:
self.can_bus = can.Bus(
interface="socketcan",
channel=self.can_channel,
bitrate=1_000_000,
)
except can.CanError as e:
raise NoWorkingDeviceException(
f"could not open CAN channel '{self.can_channel}': {e}"
)
if self.can_channel and self.can_channel.startswith("v"):
self.logger.warn("CAN interface is likely virtual")
def read(self) -> tuple[VicCAN | None, str | None]:
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
try:
message = self.can_bus.recv(timeout=0.0)
except can.CanError as e:
self.logger.error(f"CAN error while receiving: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
if message is None:
return (None, None)
arbitration_id = message.arbitration_id & 0x7FF
data_bytes = bytes(message.data)
mcu_key = (arbitration_id >> 8) & 0b111
data_type_key = (arbitration_id >> 6) & 0b11
command = arbitration_id & 0x3F
try:
mcu_name = MCU_IDS[mcu_key]
except IndexError:
self.logger.warn(
f"received CAN frame with unknown MCU key {mcu_key}; id=0x{arbitration_id:X}"
)
return (None, None)
data: list[float] = []
try:
if data_type_key == 3:
data = []
elif data_type_key == 0:
if len(data_bytes) < 8:
self.logger.warn(
f"received double payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
(value,) = struct.unpack(">d", data_bytes[:8])
data = [float(value)]
elif data_type_key == 1:
if len(data_bytes) < 8:
self.logger.warn(
f"received float32x2 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
v1, v2 = struct.unpack(">ff", data_bytes[:8])
data = [float(v1), float(v2)]
elif data_type_key == 2:
if len(data_bytes) < 8:
self.logger.warn(
f"received int16x4 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
i1, i2, i3, i4 = struct.unpack(">hhhh", data_bytes[:8])
data = [float(i1), float(i2), float(i3), float(i4)]
else:
self.logger.warn(
f"received CAN frame with unknown data_type_key {data_type_key}; id=0x{arbitration_id:X}"
)
return (None, None)
except struct.error as e:
self.logger.error(f"error unpacking CAN payload: {e}")
return (None, None)
viccan = VicCAN(
mcu_name=mcu_name,
command_id=int(command),
data=data,
)
self.logger.debug(
f"received CAN frame id=0x{message.arbitration_id:X}, "
f"decoded as VicCAN(mcu_name={viccan.mcu_name}, command_id={viccan.command_id}, data={viccan.data})"
)
return (
viccan,
f"{viccan.mcu_name},{viccan.command_id},"
+ ",".join(map(str, list(viccan.data))),
)
def write(self, msg: VicCAN):
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
# build 11-bit arbitration ID according to VicCAN spec:
# bits 10..8: targeted MCU key
# bits 7..6: data type key
# bits 5..0: command
# map MCU name to 3-bit key.
try:
mcu_id = MCU_IDS.index((msg.mcu_name or "").lower())
except ValueError:
self.logger.error(
f"unknown VicCAN mcu_name '{msg.mcu_name}' for CAN frame; dropping message"
)
return
# determine data type from length:
# 0: double x1, 1: float32 x2, 2: int16 x4, 3: empty
match data_len := len(msg.data):
case 0:
data_type = 3
data = bytes()
case 1:
data_type = 0
data = struct.pack(">d", *msg.data)
case 2:
data_type = 1
data = struct.pack(">ff", *msg.data)
case 3 | 4: # 3 gets treated as 4
data_type = 2
if data_len == 3:
msg.data.append(0)
data = struct.pack(">hhhh", *[int(x) for x in msg.data])
case _:
self.logger.error(
f"unexpected VicCAN data length: {data_len}; dropping message"
)
return
# command is limited to 6 bits.
command = int(msg.command_id)
if command < 0 or command > 0x3F:
self.logger.error(
f"invalid command_id for CAN frame: {command}; dropping message"
)
return
try:
can_message = can.Message(
arbitration_id=(mcu_id << 8) | (data_type << 6) | command,
data=data,
is_extended_id=False,
)
except Exception as e:
self.logger.error(f"failed to construct CAN message: {e}")
return
try:
self.can_bus.send(can_message)
self.logger.debug(
f"sent CAN frame id=0x{can_message.arbitration_id:X}, "
f"data={list(can_message.data)}"
)
except can.CanError as e:
self.logger.error(f"CAN error while sending: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
def write_raw(self, msg: str):
self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}")
def cleanup(self):
try:
if self.can_bus is not None:
self.logger.info("shutting down CAN bus")
self.can_bus.shutdown()
except Exception as e:
self.logger.error(e)
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger, clock: Clock):
super().__init__(logger, clock)
# No hardware interface for MockConnector. Publish to `/anchor/from_vic/mock_mcu` instead.
def read(self) -> tuple[VicCAN | None, str | None]:
return (None, None)
def write(self, msg: VicCAN):
pass
def write_raw(self, msg: str):
pass

View File

@@ -0,0 +1,65 @@
from astra_msgs.msg import VicCAN
from std_msgs.msg import Header
from builtin_interfaces.msg import Time
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(
msg: str, mcu_name: str, logger: RcutilsLogger, time: Time
) -> VicCAN | None:
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed and return None.
"""
parts: list[str] = msg.strip().split(",")
# don't need an extra check because len of .split output is always >= 1
if not parts[0].startswith("can_relay_"):
logger.debug(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
logger.debug(f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}")
return None
elif len(parts) > 7:
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
try:
command_id = int(parts[2])
except ValueError:
logger.debug(
f"got garbage (non-integer command id) CAN data from {mcu_name}: {msg}"
)
return None
if command_id not in range(64):
logger.debug(
f"got garbage (wrong command id {command_id}) CAN data from {mcu_name}: {msg}"
)
return None
try:
return VicCAN(
header=Header(
stamp=time,
frame_id="from_vic",
),
mcu_name=parts[1],
command_id=command_id,
data=[float(x) for x in parts[3:]],
)
except ValueError:
logger.debug(f"got garbage (non-numerical) CAN data from {mcu_name}: {msg}")
return None
def viccan_to_string(viccan: VicCAN) -> str:
"""Converts a ROS2 VicCAN message to the serial string VicCAN format."""
# make sure we accept 3 digits and treat it as 4
if len(viccan.data) == 3:
viccan.data.append(0)
# go from [ w, x, y, z ] -> ",w,x,y,z" & round to 7 digits max
data = "".join([f",{round(val,7)}" for val in viccan.data])
return f"can_relay_tovic,{viccan.mcu_name},{viccan.command_id}{data}\n"

View File

@@ -1,138 +1,111 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
from launch.substitutions import (
LaunchConfiguration,
ThisLaunchFileDir,
PathJoinSubstitution,
)
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, Shutdown
from launch.conditions import IfCondition
# Prevent making __pycache__ directories
from sys import dont_write_bytecode
dont_write_bytecode = True
def launch_setup(context, *args, **kwargs):
# Retrieve the resolved value of the launch argument 'mode'
mode = LaunchConfiguration("mode").perform(context)
nodes = []
if mode == "anchor":
# Launch every node and pass "anchor" as the parameter
nodes.append(
Node(
package="arm_pkg",
executable="arm", # change as needed
name="arm",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
nodes.append(
Node(
package="core_pkg",
executable="core", # change as needed
name="core",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
nodes.append(
Node(
package="core_pkg",
executable="ptz", # change as needed
name="ptz",
output="both",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")),
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
)
)
nodes.append(
Node(
package="bio_pkg",
executable="bio", # change as needed
name="bio",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
nodes.append(
Node(
package="anchor_pkg",
executable="anchor", # change as needed
name="anchor",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
elif mode in ["arm", "core", "bio", "ptz"]:
# Only launch the node corresponding to the provided mode.
if mode == "arm":
nodes.append(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
elif mode == "core":
nodes.append(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
elif mode == "bio":
nodes.append(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
elif mode == "ptz":
nodes.append(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched
)
)
else:
# If an invalid mode is provided, print an error.
print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.")
return nodes
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
declare_arg = DeclareLaunchArgument(
"mode",
default_value="anchor",
description="Launch mode: arm, core, bio, anchor, or ptz",
connector = LaunchConfiguration("connector")
serial_override = LaunchConfiguration("serial_override")
can_override = LaunchConfiguration("can_override")
use_ptz = LaunchConfiguration("use_ptz")
ld = LaunchDescription()
# arguments
ld.add_action(
DeclareLaunchArgument(
"connector",
default_value="auto",
description="Connector parameter for anchor node (default: 'auto')",
)
)
return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)])
ld.add_action(
DeclareLaunchArgument(
"serial_override",
default_value="",
description="Serial port override parameter for anchor node (default: '')",
)
)
ld.add_action(
DeclareLaunchArgument(
"can_override",
default_value="",
description="CAN network override parameter for anchor node (default: '')",
)
)
ld.add_action(
DeclareLaunchArgument(
"use_ptz",
default_value="true", # must be string for launch system
description="Whether to launch PTZ node (default: true)",
)
)
# nodes
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="anchor_pkg",
executable="anchor",
name="anchor",
output="both",
parameters=[
{
"launch_mode": "anchor",
"connector": connector,
"serial_override": serial_override,
"can_override": can_override,
}
],
on_exit=Shutdown(),
)
)
return ld

View File

@@ -3,13 +3,14 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<description>Anchor -- ROS and CAN relay node</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend>

View File

@@ -1,279 +0,0 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
from std_msgs.msg import String
from astra_msgs.msg import ControllerState
from astra_msgs.msg import ArmManual
from astra_msgs.msg import ArmIK
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("arm_headless")
# Depricated, kept temporarily for reference
# self.create_timer(0.20, self.send_controls)#read and send controls
self.create_timer(0.1, self.send_manual)
# Create a publisher to publish any output the pico sends
# Depricated, kept temporarily for reference
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
# Create a subscriber to listen to any commands sent for the pico
# Depricated, kept temporarily for reference
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
# Check the pico for updates
# self.read_feedback()
if (
pygame.joystick.get_count() == 0
): # if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
# self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
# self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() # re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_manual(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = ArmManual()
# Triggers for gripper control
if self.gamepad.get_axis(2) > 0: # left trigger
input.gripper = -1
elif self.gamepad.get_axis(5) > 0: # right trigger
input.gripper = 1
# Toggle Laser
if self.gamepad.get_button(7): # Start
self.laser_status = 1
elif self.gamepad.get_button(6): # Back
self.laser_status = 0
input.laser = self.laser_status
if self.gamepad.get_button(5): # right bumper, control effector
# Left stick X-axis for effector yaw
if self.gamepad.get_axis(0) > 0:
input.effector_yaw = 1
elif self.gamepad.get_axis(0) < 0:
input.effector_yaw = -1
# Right stick X-axis for effector roll
if self.gamepad.get_axis(3) > 0:
input.effector_roll = 1
elif self.gamepad.get_axis(3) < 0:
input.effector_roll = -1
else: # Control arm axis
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
input.axis1 = round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
input.axis3 = -1 * round(self.gamepad.get_axis(4))
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
# Button Mappings
# axis2 -> LT
# axis5 -> RT
# Buttons0 -> A
# Buttons1 -> B
# Buttons2 -> X
# Buttons3 -> Y
# Buttons4 -> LB
# Buttons5 -> RB
# Buttons6 -> Back
# Buttons7 -> Start
input.linear_actuator = 0
if pygame.joystick.get_count() != 0:
self.get_logger().info(
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
)
self.manual_pub.publish(input)
else:
pass
pass
# Depricated, kept temporarily for reference
# def send_controls(self):
# for event in pygame.event.get():
# if event.type == pygame.QUIT:
# pygame.quit()
# exit()
# input = ControllerState()
# input.lt = self.gamepad.get_axis(2)#left trigger
# input.rt = self.gamepad.get_axis(5)#right trigger
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
# if(self.gamepad.get_button(4)):#left bumper
# input.lb = True
# else:
# input.lb = False
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
# if(self.gamepad.get_button(5)):#right bumper
# input.rb = True
# else:
# input.rb = False
# #input.plus = self.gamepad.get_button(6)#plus button
# if(self.gamepad.get_button(7)):#plus button
# input.plus = True
# else:
# input.plus = False
# #input.minus = self.gamepad.get_button(4)#minus button
# if(self.gamepad.get_button(6)):#minus button
# input.minus = True
# else:
# input.minus = False
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
# #input.a = self.gamepad.get_button(1)#A button
# if(self.gamepad.get_button(0)):#A button
# input.a = True
# else:
# input.a = False
# #input.b = self.gamepad.get_button(0)#B button
# if(self.gamepad.get_button(1)):#B button
# input.b = True
# else:
# input.b = False
# #input.x = self.gamepad.get_button(3)#X button
# if(self.gamepad.get_button(2)):#X button
# input.x = True
# else:
# input.x = False
# #input.y = self.gamepad.get_button(2)#Y button
# if(self.gamepad.get_button(3)):#Y button
# input.y = True
# else:
# input.y = False
# dpad_input = self.gamepad.get_hat(0)#D-pad input
# #not using up/down on DPad
# input.d_up = False
# input.d_down = False
# if(dpad_input[0] == 1):#D-pad right
# input.d_right = True
# else:
# input.d_right = False
# if(dpad_input[0] == -1):#D-pad left
# input.d_left = True
# else:
# input.d_left = False
# if pygame.joystick.get_count() != 0:
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
# self.publisher.publish(input)
# else:
# pass
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
# tb_bs = BaseStation()
# node.run()
if __name__ == "__main__":
main()

View File

@@ -1,157 +1,221 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
import serial
import sys
import threading
import glob
import time
import atexit
import signal
import math
from warnings import deprecated
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
def __init__(self):
super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
##################################################
# Topics
# Parameters
self.declare_parameter("use_old_topics", True)
self.use_old_topics = (
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
##################################################
# Old topics
if self.use_old_topics:
# Anchor topics
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Create publishers
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
)
self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(
DigitFeedback, "/arm/feedback/digit", 10
)
self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
)
###################################################
# New topics
# Anchor topics
# from_vic
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# Manual: who tf knows. Maybe JointJog?
# Manual: /arm/manual/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/manual/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
JointState,
"/joint_commands",
self.joint_command_callback,
qos_profile=control_qos,
)
# Feedback
# Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
"/arm/feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
# IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog):
if len(msg.joint_names) != len(msg.velocities):
self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
return
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)] # type: ignore
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# Old
self.send_velocities(velocities, msg.header)
# Create publishers
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
)
self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
)
def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
try:
thread.join()
except KeyboardInterrupt:
pass
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
# TODO: refactor to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians
# Grab velocities from message
velocities = [
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
(
msg.velocity[msg.name.index(joint_name)] # type: ignore
if joint_name in msg.name
else 0.0
)
for joint_name in self.all_joint_names
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Axis 0-3
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3])
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg())
# Wrist yaw and roll, gripper included for future use when have adequate hardware
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6])
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg())
self.send_velocities(velocities, msg.header)
self.anchor_tovic_pub_.publish(arm_cmd)
self.anchor_tovic_pub_.publish(digit_cmd)
def send_velocities(self, velocities: list[float], header: Header):
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def send_manual(self, msg: ArmManual):
axis0 = msg.axis0
axis1 = -1 * msg.axis1
@@ -166,7 +230,7 @@ class ArmNode(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n"
@@ -176,10 +240,12 @@ class ArmNode(Node):
return
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str):
output = String(data=msg)
self.anchor_pub.publish(output)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String):
output = msg.data
if output.startswith("can_relay_fromvic,arm,55"):
@@ -213,12 +279,11 @@ class ArmNode(Node):
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm":
return
assert msg.mcu_name == "arm"
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id]
if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -257,12 +322,8 @@ class ArmNode(Node):
# Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians(
-angles[2]
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
@@ -294,22 +355,21 @@ class ArmNode(Node):
velocities[1]
) # Axis 1
self.saved_joint_state.velocity[2] = math.radians(
-velocities[2]
) # Axis 2 (-)
velocities[2]
) # Axis 2
self.saved_joint_state.velocity[3] = math.radians(
-velocities[3]
) # Axis 3 (-)
velocities[3]
) # Axis 3
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit":
return
assert msg.mcu_name == "digit"
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id]
if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -329,10 +389,12 @@ class ArmNode(Node):
msg.data[1]
) # Wrist yaw
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback)
@deprecated("Uses an old message type. Will be removed at some point.")
def updateAngleFeedback(self, msg: str):
# Angle feedbacks,
# split the msg.data by commas
@@ -351,6 +413,7 @@ class ArmNode(Node):
else:
self.get_logger().info("Invalid angle feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.split(",")
@@ -365,6 +428,7 @@ class ArmNode(Node):
else:
self.get_logger().info("Invalid voltage feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateMotorFeedback(self, msg: str):
parts = str(msg.strip()).split(",")
motorId = round(float(parts[3]))
@@ -389,17 +453,27 @@ class ArmNode(Node):
self.arm_feedback.axis0_current = current
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None):
rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
arm_node = ArmNode()
arm_node.run()
rclpy.try_shutdown()
try:
rclpy.spin(arm_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -1,27 +1,22 @@
from setuptools import find_packages, setup
import os
from glob import glob
from setuptools import setup
package_name = "arm_pkg"
setup(
name=package_name,
version="1.0.0",
packages=find_packages(exclude=["test"]),
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="TODO: Package description",
license="All Rights Reserved",
maintainer="David",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": [
"arm = arm_pkg.arm_node:main",
"headless = arm_pkg.arm_headless:main",
],
"console_scripts": ["arm = arm_pkg.arm_node:main"],
},
)

View File

@@ -33,12 +33,12 @@ CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
# Used to verify the length of an incoming VicCAN feedback message

View File

@@ -262,7 +262,7 @@ class PtzNode(Node):
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
)
self.debug_pub.publish(msg)
self.get_logger().info(message_text)
self.get_logger().debug(message_text)
def run_async_func(self, coro):
"""Run an async function in the event loop."""

View File

@@ -6,23 +6,28 @@ from rclpy.duration import Duration
import signal
import time
import atexit
import os
import sys
import threading
import glob
import pwd
import grp
from math import copysign
from std_srvs.srv import Trigger
from std_msgs.msg import String
from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState
import warnings
# Literally headless
warnings.filterwarnings(
"ignore",
message="Your system is avx2 capable but pygame was not built with support for it.",
)
import pygame
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
@@ -36,35 +41,44 @@ CORE_STOP_TWIST_MSG = Twist() # "
ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
)
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
ARM_DEADZONE = float(os.getenv("ARM_DEADZONE", "0.2"))
class Headless(Node):
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
super().__init__("headless")
super().__init__("headless_node")
##################################################
# Preamble
# Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
@@ -116,44 +130,88 @@ class Headless(Node):
break
id += 1
self.create_timer(0.1, self.send_controls)
##################################################
# Parameters
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
self.declare_parameter("use_old_topics", True)
self.use_old_topics = (
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value
self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
self.declare_parameter("use_arm_ik", False)
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
)
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
self.get_parameter("use_new_arm_manual_scheme")
.get_parameter_value()
.bool_value
)
# Check parameter validity
if self.arm_mode not in ["manual", "ik"]:
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
if not self.use_new_arm_manual_scheme and not self.use_old_topics:
self.get_logger().warn(
f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)."
f"New arm manual does not support old control scheme. Defaulting to new scheme."
)
if self.arm_manual_scheme not in ["old", "new"]:
self.get_logger().warn(
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
if not self.use_old_topics:
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
)
# TODO: add new bio topics
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.arm_mode == "ik":
if self.use_arm_ik:
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
@@ -170,13 +228,22 @@ class Headless(Node):
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
# Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150)
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik:
self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
else:
self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
# TODO: add bio here after implementing new topics
def send_controls(self):
"""Read the gamepad state and publish control messages"""
for event in pygame.event.get():
@@ -187,10 +254,8 @@ class Headless(Node):
# Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
# Stop the rover if controller disconnected
self.stop_all()
self.get_logger().info("Final stop commands sent. Shutting down.")
# Clean up
pygame.quit()
@@ -206,23 +271,51 @@ class Headless(Node):
new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.stop_all()
self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
if self.ctrl_mode == "arm" and self.use_bio:
self.get_logger().warning("NOTE: Using bio instead of arm.")
# CORE
if self.ctrl_mode == "core" and CORE_MODE == "duty":
# Actually send the controls
if self.ctrl_mode == "core":
self.send_core()
if self.use_old_topics:
if self.use_bio:
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.arm_publisher.publish(ARM_STOP_MSG)
# New topics shouldn't need to constantly send zeroes imo
else:
if self.use_bio:
self.send_bio()
else:
self.send_arm()
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
# Ditto
def send_core(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
input = CoreControl()
input.max_speed = 90
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
# Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -238,16 +331,9 @@ class Headless(Node):
self.core_publisher.publish(input)
elif self.ctrl_mode == "core" and CORE_MODE == "twist":
else: # New topics
input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
button_a = self.gamepad.get_button(0)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
# Forward/back and Turn
input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(
@@ -280,31 +366,36 @@ class Headless(Node):
state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
)
# ARM and BIO
if self.ctrl_mode == "arm" and self.arm_mode == "manual":
def send_arm(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# OLD MANUAL
# ==========
if not self.use_arm_ik and self.use_old_topics:
arm_input = ArmManual()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.arm_manual_scheme == "old":
# OLD ARM MANUAL CONTROL SCHEME
if not self.use_new_arm_manual_scheme:
# EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0
@@ -347,7 +438,8 @@ class Headless(Node):
if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y)
if self.arm_manual_scheme == "new":
# NEW ARM MANUAL CONTROL SCHEME
if self.use_new_arm_manual_scheme:
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
@@ -358,28 +450,16 @@ class Headless(Node):
# X: _
# Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.effector_yaw = (
0 if abs(right_stick_x) < ARM_THRESHOLD else int(copysign(1, right_stick_x))
)
arm_input.axis3 = (
0 if abs(right_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, right_stick_y))
)
arm_input.effector_yaw = stick_to_arm_direction(right_stick_x)
arm_input.axis3 = -1 * stick_to_arm_direction(right_stick_y)
# Left stick: axis 1 and 2
arm_input.axis1 = (
0 if abs(left_stick_x) < ARM_THRESHOLD else int(copysign(1, left_stick_x))
)
arm_input.axis2 = (
0 if abs(left_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, left_stick_y))
)
arm_input.axis1 = stick_to_arm_direction(left_stick_x)
arm_input.axis2 = -1 * stick_to_arm_direction(left_stick_y)
# D-pad: axis 0 and _
arm_input.axis0 = (
0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0]))
)
arm_input.axis0 = int(dpad_input[0])
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
@@ -409,22 +489,80 @@ class Headless(Node):
else:
arm_input.linear_actuator = 0
# BIO
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.arm_publisher.publish(arm_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
# NEW MANUAL
# ==========
elif not self.use_arm_ik and not self.use_old_topics:
arm_input = JointJog()
arm_input.header.frame_id = "base_link"
arm_input.header.stamp = self.get_clock().now().to_msg()
arm_input.joint_names = self.all_joint_names
arm_input.velocities = [0.0] * len(self.all_joint_names)
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
# Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
stick_to_arm_direction(right_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
stick_to_arm_direction(right_stick_y)
)
# Left stick: axis 1 and 2
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = float(
stick_to_arm_direction(left_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
stick_to_arm_direction(left_stick_y)
)
# D-pad: axis 0 and _
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = float(
dpad_input[0]
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.velocities[
self.all_joint_names.index("ef_gripper_left_joint")
] = 0.0
elif left_trigger > 0:
arm_input.velocities[
self.all_joint_names.index("ef_gripper_left_joint")
] = -1.0
elif right_trigger > 0:
arm_input.velocities[
self.all_joint_names.index("ef_gripper_left_joint")
] = 1.0
# Bumpers: EF roll
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = (
right_bumper - left_bumper
)
# A: brake
# TODO: Brake mode
# Y: linear actuator
# TODO: linear actuator
self.arm_manual_pub_.publish(arm_input)
# IK (ONLY NEW)
# =============
elif self.use_arm_ik:
arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg()
@@ -432,21 +570,6 @@ class Headless(Node):
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x
# Left stick: angular z and linear z
# D-pad: angular y and _
@@ -473,9 +596,7 @@ class Headless(Node):
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append(
"ef_gripper_left_joint"
)
arm_jointjog.joint_names.append("ef_gripper_left_joint") # type: ignore
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
@@ -489,14 +610,66 @@ class Headless(Node):
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def send_bio(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
if self.use_old_topics:
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if left_trigger > 0 or right_trigger > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.bio_publisher.publish(bio_input)
else:
pass # TODO: implement new bio control topics
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
def arm_ik_twist_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
if abs(value) < threshold:
return 0
return value
def stick_to_arm_direction(value: float, threshold=ARM_DEADZONE) -> int:
"""Apply a larger deadzone to a stick input and make digital/binary instead of analog"""
if abs(value) < threshold:
return 0
return int(copysign(1, value))
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
@@ -515,10 +688,19 @@ def is_user_in_group(group_name: str) -> bool:
return False
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None):
try:
rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
node = Headless()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
@@ -528,7 +710,4 @@ def main(args=None):
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

8
treefmt.nix Normal file
View File

@@ -0,0 +1,8 @@
{ pkgs, ... }:
{
projectRootFile = "flake.nix";
programs = {
nixfmt.enable = true;
black.enable = true;
};
}