57 Commits

Author SHA1 Message Date
David
90a9519b55 feat(core): change rover_platform parameter to override, default with hostname instead of environment variable 2026-04-12 17:37:39 -05:00
David
ab4f998ac1 feat(core): rate limit motor commands from diff_drive_controller 2026-04-11 21:07:21 -05:00
David
b3f996113c chore: bump astra_descriptions 2026-04-11 19:33:07 -05:00
David
191c9d613d fix(headless): correctly scale cmd_vel values 2026-04-11 19:12:49 -05:00
David
5e0946e8d7 fix(core): align rover_platform parameter with launch file
Launch file uses clucky, testbed, auto. Node was using core, testbed, auto. Replaced core with clucky in the node.
2026-04-11 00:22:51 -05:00
David
3dd80bbd29 fix(arm): change old anchor topic bacc 2026-04-11 00:21:09 -05:00
David
e53c1f32c9 chore: bump astra_msgs submodule 2026-04-08 01:21:20 -05:00
David
d0f6ecf702 Merge remote-tracking branch 'origin/main' into core-ros2-control 2026-04-08 01:15:16 -05:00
Riley M.
8404999369 Merge pull request #31 from SHC-ASTRA/can-refactor
Refactor anchor & add direct CAN connector
2026-04-08 00:18:13 -05:00
ryleu
88574524cf clarify the mock connector usage in the README 2026-04-08 00:15:39 -05:00
ryleu
30bb32a66b remove extraneous slice 2026-04-08 00:09:04 -05:00
David
010d2da0b6 fix: string number 2026-04-07 23:45:40 -05:00
ryleu
0a257abf43 make the pad 3 -> logic consistent 2026-04-07 23:44:38 -05:00
ryleu
b09b55bee0 fix bug because apparently python has arrays 2026-04-07 22:19:55 -05:00
ryleu
ec7f272934 clean up code 2026-04-07 22:16:08 -05:00
ryleu
bc9183d59a make mock mcu use VicCAN messages 2026-04-07 21:52:52 -05:00
ryleu
410d3706ed update README with mock connector instructions 2026-04-02 19:49:07 -05:00
ryleu
89b3194914 update documentation and accept 3-value VicCAN messages 2026-04-02 19:43:10 -05:00
ryleu
4ef226c094 nix fmt 2026-04-01 03:31:21 -05:00
SHC-ASTRA
327539467c fixed can connector 2026-04-01 02:50:12 -05:00
SHC-ASTRA
e570d371c6 fix a plethora of bugs related to the serial connector 2026-04-01 01:48:40 -05:00
ryleu
2213896494 change is_testbed to rover_platform 2026-03-31 23:01:06 -05:00
David
c42cd39fda feat(arm): implement ArmCtrlState topic 2026-03-26 12:40:27 -05:00
David
f1c84c3cc5 fix(arm): correctly control arm 2026-03-26 03:53:38 -05:00
David
cf699da0c6 fix(arm): populate missing feedback fields 2026-03-26 02:40:58 -05:00
David
7669ded344 feat(core): add code support for testbed
Adds parameter `is_testbed`. WIP: still needs support in astra_descriptions; new parameter will not be usable until then. When using an incorrect parameter value, odom will be incorrect and will drive at a scaled speed from requested over cmd_vel.
2026-03-26 02:05:37 -05:00
David
ec12a083f1 feat(core): remove bypass /cmd_vel topic 2026-03-26 01:37:57 -05:00
David
ea36ce6ef4 fix(core): populate missing values in /core/feedback_new 2026-03-26 00:40:11 -05:00
David
3f795bf8ed chore(core): remove core_headless 2026-03-24 15:44:30 -05:00
David
b257dc7556 fix: update ros2 plumbing files 2026-03-24 15:41:00 -05:00
David
45825189a5 fix(core): populate CoreFeedback Header stamp 2026-03-23 21:57:48 -05:00
ryleu
f7efa604d2 finish adding parameters 2026-03-23 20:39:50 -05:00
ryleu
fe46a2ab4d fix wrong order for initialization 2026-03-23 13:25:13 -05:00
ryleu
941e196316 implement review comments 2026-03-21 18:14:44 -05:00
David
a17060ceda refactor(core): cleanup to match arm 2026-03-20 10:42:18 -05:00
David
ff4a58e6ed refactor: (headless) finish integrating Core cmd_vel 2026-03-19 20:02:37 -05:00
David
120891c8e5 chore: update astra_msgs to main 2026-03-19 19:49:25 -05:00
David
178d5001d6 Merge remote-tracking branch 'origin/main' into core-ros2-control 2026-03-19 19:45:59 -05:00
ryleu
7a3c4af1ce remove .envrc sourcing of install 2026-03-18 23:28:49 -05:00
ryleu
5e5a52438d black fmt 2026-03-18 23:22:42 -05:00
David Sharpe
684c2994ec cmd_vel headless 2026-03-18 01:17:00 -05:00
ryleu
c814f34ca6 rewrite the launch file 2026-03-18 00:12:42 -05:00
ryleu
ce39d0aeb9 Merge remote-tracking branch 'origin/main' into can-refactor 2026-03-17 23:57:38 -05:00
ryleu
9b96244a1b add can support 2026-03-17 23:52:37 -05:00
David Sharpe
e588ff0a7b Arm topic refactor (#33)
Headless too
2026-03-17 23:34:35 -05:00
David Sharpe
e83642cfe8 style: (arm) fix comment and variable name 2026-03-17 23:07:06 -05:00
David Sharpe
67b3c5bc8f refactor: (arm) consolidate velocity control VicCAN into single function 2026-03-17 01:44:32 -05:00
David Sharpe
c506a34b37 style: (arm) format 2026-03-17 01:43:22 -05:00
David Sharpe
980c08ba4f refactor: implement Riley's comments
- Add @warnings.deprecated decorators to callbacks that use old topics
- Rename "*new*" topics
- Print debug on malformed message receival
- Un-invert Axes 2 & 3 in the URDF
- Don't silently check mcu_name twice
- Remove threading.Thread (not one of Riley's comments)
- Add a real signal handler (ditto)
- Move stamped stop messages to helper functions
2026-03-17 01:14:18 -05:00
David
bfa50e3a25 feat: (core) make compatible with ros2_control 2026-02-26 17:17:05 -06:00
David
90fbbac813 feat: add ros2 control option to main launch for core 2026-02-26 17:17:02 -06:00
ryleu
b388275bba clean stuff up a bit to prep for CAN 2026-02-15 17:23:18 -06:00
ryleu
5c0194c543 remove KNOWN_USBS from anchor_node.py 2026-02-15 01:05:37 -06:00
ryleu
809ca71208 remove nested for loops 2026-02-15 01:04:43 -06:00
SHC-ASTRA
225700bb86 tested it on testbed and had to change things 2026-02-15 00:47:20 -06:00
ryleu
4459886fc1 add a mock mode and fix a logic error 2026-02-14 23:16:34 -06:00
ryleu
18fce2c19b worth a shot to see if it works 2026-02-14 21:39:32 -06:00
30 changed files with 1856 additions and 1590 deletions

View File

@@ -60,6 +60,33 @@ $ ros2 launch anchor_pkg rover.launch.py # Must be run on a computer connected
$ ros2 run headless_pkg headless_full # Optionally run in a separate shell on the same or different computer. $ ros2 run headless_pkg headless_full # Optionally run in a separate shell on the same or different computer.
``` ```
### Using the Mock Connector
Anchor provides a mock connector meant for testing and scripting purposes. You can select the mock connector by running anchor with this command:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:="mock"
```
To see all data that would be sent over the CAN network (and thus to the microcontrollers), use this command:
```bash
$ ros2 topic echo /anchor/to_vic/debug
```
To send data to the mock connector (as if you were a ROS2 node), use the normal relay topic:
```bash
$ ros2 topic pub /anchor/to_vic/relay astra_msgs/msg/VicCAN '{mcu_name: "core", command_id: 50, data: [0.0, 2.0, 0.0, 1.0]}'
```
To send data to the mock connector (as if you were a microcontroller), publish to the dedicated topic:
```bash
$ ros2 topic pub /anchor/from_vic/mock_mcu astra_msgs/msg/VicCAN '{mcu_name: "arm", command_id: 55, data: [0.0, 450.0, 900.0, 0.0]}'
```
### Testing Serial ### Testing Serial
You can fake the presence of a Serial device (i.e., MCU) by using the following command: You can fake the presence of a Serial device (i.e., MCU) by using the following command:
@@ -68,10 +95,31 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT $ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
``` ```
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so: When you go to run anchor, use the `serial_override` ROS2 parameter to point it to the fake serial port, like so:
```bash ```bash
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py $ ros2 launch anchor_pkg rover.launch.py connector:=serial serial_override:=/tmp/ttyACM9
```
### Testing CAN
You can create a virtual CAN network by using the following commands to create and then enable it:
```bash
sudo ip link add dev vcan0 type vcan
sudo ip link set vcan0 up
```
When you go to run anchor, use the `can_override` ROS2 parameter to point it to the virtual CAN network, like so:
```bash
$ ros2 launch anchor_pkg rover.launch.py connector:=can can_override:=vcan0
```
Once you're done, you should delete the virtual network so that anchor doesn't get confused if you plug in a real CAN adapter:
```bash
$ sudo ip link delete vcan0
``` ```
### Connecting the GuliKit Controller ### Connecting the GuliKit Controller

23
flake.lock generated
View File

@@ -60,7 +60,8 @@
"nixpkgs": [ "nixpkgs": [
"nix-ros-overlay", "nix-ros-overlay",
"nixpkgs" "nixpkgs"
] ],
"treefmt-nix": "treefmt-nix"
} }
}, },
"systems": { "systems": {
@@ -77,6 +78,26 @@
"repo": "default", "repo": "default",
"type": "github" "type": "github"
} }
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1773297127,
"narHash": "sha256-6E/yhXP7Oy/NbXtf1ktzmU8SdVqJQ09HC/48ebEGBpk=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "71b125cd05fbfd78cab3e070b73544abe24c5016",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
} }
}, },
"root": "root", "root": "root",

View File

@@ -4,6 +4,11 @@
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
treefmt-nix = {
url = "github:numtide/treefmt-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
}; };
outputs = outputs =
@@ -11,7 +16,8 @@
self, self,
nix-ros-overlay, nix-ros-overlay,
nixpkgs, nixpkgs,
}: ...
}@inputs:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem ( nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system: system:
let let
@@ -28,6 +34,7 @@
(python313.withPackages ( (python313.withPackages (
p: with p; [ p: with p; [
pyserial pyserial
python-can
pygame pygame
scipy scipy
crccheck crccheck
@@ -83,6 +90,8 @@
export QT_X11_NO_MITSHM=1 export QT_X11_NO_MITSHM=1
''; '';
}; };
formatter = (inputs.treefmt-nix.lib.evalModule pkgs ./treefmt.nix).config.build.wrapper;
} }
); );

View File

@@ -1,28 +1,24 @@
from warnings import deprecated
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal
import time
import atexit import atexit
import serial from .connector import (
import serial.tools.list_ports Connector,
import os MockConnector,
import sys SerialConnector,
CANConnector,
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import threading import threading
import glob
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
from std_msgs.msg import String
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
class Anchor(Node): class Anchor(Node):
@@ -36,308 +32,204 @@ class Anchor(Node):
- VicCAN messages for Arm node - VicCAN messages for Arm node
* /anchor/from_vic/bio * /anchor/from_vic/bio
- VicCAN messages for Bio node - VicCAN messages for Bio node
* /anchor/to_vic/debug
- A string copy of the messages published to ./relay are published here
Subscribers: Subscribers:
* /anchor/from_vic/mock_mcu * /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU - For testing without an actual MCU, publish ViCAN messages here as if they came from an MCU
* /anchor/to_vic/relay * /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU - Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string * /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging - Send raw strings to connectors. Does not work for connectors that require conversion (like CANConnector)
* /anchor/relay
- Legacy method for talking to connectors. Takes String as input, but does not send the raw strings to connectors.
Instead, it converts them to VicCAN messages first.
""" """
connector: Connector
def __init__(self): def __init__(self):
# Initalize node with name super().__init__("anchor_node")
super().__init__("anchor_node") # previously 'serial_publisher'
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0" logger = self.get_logger()
# Serial port override # ROS2 Parameter Setup
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
################################################## self.declare_parameter(
# Serial MCU Discovery "connector",
"auto",
# If there was not a port override, look for a MCU over USB for Serial. ParameterDescriptor(
if self.serial_port is None: name="connector",
comports = serial.tools.list_ports.comports() description="Declares which MCU connector should be used. Defaults to 'auto'.",
real_ports = list( type=ParameterType.PARAMETER_STRING,
filter( additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.",
lambda p: p.vid is not None ),
and p.pid is not None
and p.device is not None,
comports,
) )
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU self.declare_parameter(
found_port = recog_ports[0] "can_override",
self.get_logger().info( "",
f"Selecting MCU '{found_port.description}' at {found_port.device}." ParameterDescriptor(
name="can_override",
description="Overrides which CAN channel will be used. Defaults to ''.",
type=ParameterType.PARAMETER_STRING,
additional_constraints="Must be a valid CAN network that shows up in `ip link show`.",
),
) )
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# We still don't have a serial port; fall back to legacy discovery (Areeb's code) self.declare_parameter(
# Loop through all serial devices on the computer to check for the MCU "serial_override",
if self.serial_port is None: "",
self.get_logger().warning("Falling back to legacy MCU discovery...") ParameterDescriptor(
ports = Anchor.list_serial_ports() name="serial_override",
for _ in range(4): description="Overrides which serial port will be used. Defaults to ''.",
if self.serial_port is not None: type=ParameterType.PARAMETER_STRING,
break additional_constraints="Must be a valid path to a serial device file that shows up in `ls /dev/tty*`.",
for port in ports: ),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
can_override = (
self.get_parameter("can_override").get_parameter_value().string_value
)
serial_override = (
self.get_parameter("serial_override").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(
logger, self.get_clock(), serial_override
)
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(logger, self.get_clock(), can_override)
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(logger, self.get_clock())
case "auto":
logger.info("automatically determining connector")
try: try:
# connect and send a ping command logger.info("trying CAN connector")
ser = serial.Serial(port, 115200, timeout=1) self.connector = CANConnector(
# (f"Checking port {port}...") logger, self.get_clock(), can_override
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
) )
try: except (NoValidDeviceException, NoWorkingDeviceException, TypeError):
self.serial_interface = serial.Serial( logger.info("CAN connector failed, trying serial connector")
self.serial_port, 115200, timeout=1 self.connector = SerialConnector(
logger, self.get_clock(), serial_override
) )
case _:
# Attempt to get name of connected MCU logger.fatal(
self.serial_interface.write( f"invalid value for connector parameter: {connector_select}"
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
) )
exit(1)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit
atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup # ROS2 Topic Setup
# New pub/sub with VicCAN # Publishers
self.fromvic_debug_pub_ = self.create_publisher( self.fromvic_debug_pub_ = self.create_publisher( # only used by serial
String, "/anchor/from_vic/debug", 20 String,
"/anchor/from_vic/debug",
20,
) )
self.fromvic_core_pub_ = self.create_publisher( self.fromvic_core_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/core", 20 VicCAN,
"/anchor/from_vic/core",
20,
) )
self.fromvic_arm_pub_ = self.create_publisher( self.fromvic_arm_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/arm", 20 VicCAN,
"/anchor/from_vic/arm",
20,
) )
self.fromvic_bio_pub_ = self.create_publisher( self.fromvic_bio_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/bio", 20 VicCAN,
"/anchor/from_vic/bio",
20,
)
# Debug publisher
self.tovic_debug_pub_ = self.create_publisher(
VicCAN,
"/anchor/to_vic/debug",
20,
) )
self.mock_mcu_sub_ = self.create_subscription( # Subscribers
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
self.tovic_sub_ = self.create_subscription( self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20 VicCAN,
"/anchor/to_vic/relay",
self.write_connector,
20,
) )
self.tovic_debug_sub_ = self.create_subscription( self.tovic_sub_legacy_ = self.create_subscription(
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20 String,
"/anchor/relay",
self.write_connector_legacy,
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
)
self.tovic_string_sub_ = self.create_subscription(
String,
"/anchor/to_vic/relay_string",
self.connector.write_raw,
20,
) )
# Create publishers # Close devices on exit
self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10) atexit.register(self.cleanup)
self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
# Create a subscriber
self.relay_sub = self.create_subscription(
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try:
output = str(self.serial_interface.readline(), "utf8")
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
# self.get_logger().info(f"Got command from mock MCU: {msg}")
self.relay_fromvic(msg.data)
def on_relay_tovic_viccan(self, msg: VicCAN):
"""Relay a VicCAN message to the MCU"""
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
for num in msg.data:
output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.serial_interface.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
self.fromvic_debug_pub_.publish(String(data=msg))
parts = msg.strip().split(",")
if len(parts) > 0 and parts[0] != "can_relay_fromvic":
self.get_logger().debug(f"Ignoring non-VicCAN message: '{msg.strip()}'")
return
# String validation
malformed: bool = False
malformed_reason: str = ""
if len(parts) < 3 or len(parts) > 7:
malformed = True
malformed_reason = (
f"invalid argument count (expected [3,7], got {len(parts)})"
)
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
malformed = True
malformed_reason = f"invalid mcu_name '{parts[1]}'"
elif not (parts[2].isnumeric()) or int(parts[2]) < 0:
malformed = True
malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer"
else:
for x in parts[3:]:
try:
float(x)
except ValueError:
malformed = True
malformed_reason = f"data '{x}' is not a float"
break
if malformed:
self.get_logger().warning(
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
)
return
# Have valid VicCAN message
output = VicCAN()
output.mcu_name = parts[1]
output.command_id = int(parts[2])
if len(parts) > 3:
output.data = [float(x) for x in parts[3:]]
output.header = Header(
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
)
# self.get_logger().info(f"Relaying from MCU: {output}")
if output.mcu_name == "core":
self.fromvic_core_pub_.publish(output)
elif output.mcu_name == "arm" or output.mcu_name == "digit":
self.fromvic_arm_pub_.publish(output)
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
self.fromvic_bio_pub_.publish(output)
def on_relay_tovic_string(self, msg: String):
"""Relay a raw string message to the MCU for debugging"""
message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}")
self.serial_interface.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") self.connector.cleanup()
if self.serial_interface.is_open:
self.serial_interface.close() def read_connector(self):
"""Check the connector for new data from the MCU, and publish string to appropriate topics"""
viccan, raw = self.connector.read()
if raw:
self.fromvic_debug_pub_.publish(String(data=raw))
if viccan:
self.relay_fromvic(viccan)
def write_connector(self, msg: VicCAN):
"""Write to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write(msg)
self.tovic_debug_pub_.publish(msg)
@deprecated(
"Use /anchor/to_vic/relay or /anchor/to_vic/relay_string instead of /anchor/relay"
)
def write_connector_legacy(self, msg: String):
"""Write to the connector by first attempting to convert String to VicCAN"""
# please do not reference this code. ~riley
for cmd in msg.data.split("\n"):
viccan = string_to_viccan(
cmd,
"anchor",
self.get_logger(),
self.get_clock().now().to_msg(),
)
if viccan:
self.write_connector(viccan)
def relay_fromvic(self, msg: VicCAN):
"""Relay a message from the MCU to the appropriate VicCAN topic"""
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
self.fromvic_arm_pub_.publish(msg)
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
def main(args=None): def main(args=None):
@@ -350,16 +242,9 @@ def main(args=None):
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok(): while rclpy.ok():
anchor_node.read_MCU() # Check the MCU for updates anchor_node.read_connector() # Check the connector for updates
rate.sleep() rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...") print("Caught shutdown signal, shutting down...")
finally: finally:
rclpy.try_shutdown() rclpy.try_shutdown()
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -0,0 +1,438 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.clock import Clock
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan as _string_to_viccan, viccan_to_string
# CAN
import can
import can.interfaces.socketcan
import struct
# Serial
import serial
import serial.tools.list_ports
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
BAUD_RATE = 115200
MCU_IDS = [
"broadcast",
"core",
"arm",
"digit",
"faerie",
"citadel",
"libs",
]
class NoValidDeviceException(Exception):
pass
class NoWorkingDeviceException(Exception):
pass
class MultipleValidDevicesException(Exception):
pass
class DeviceClosedException(Exception):
pass
class Connector(ABC):
logger: RcutilsLogger
clock: Clock
def string_to_viccan(self, msg: str, mcu_name: str):
"""function currying so that we do not need to pass logger and clock every time"""
return _string_to_viccan(
msg,
mcu_name,
self.logger,
self.clock.now().to_msg(),
)
def __init__(self, logger: RcutilsLogger, clock: Clock):
self.logger = logger
self.clock = clock
@abstractmethod
def read(self) -> tuple[VicCAN | None, str | None]:
"""
Must return a tuple of (VicCAN, debug message or string repr of VicCAN)
"""
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
@abstractmethod
def write_raw(self, msg: str):
pass
def cleanup(self):
pass
class SerialConnector(Connector):
port: str
mcu_name: str
serial_interface: serial.Serial
def __init__(self, logger: RcutilsLogger, clock: Clock, serial_override: str = ""):
super().__init__(logger, clock)
ports = self._find_ports()
mcu_name: str | None = None
if serial_override:
logger.warn(
f"using serial_override: `{serial_override}`! this will bypass several checks."
)
ports = [serial_override]
mcu_name = "override"
if len(ports) <= 0:
raise NoValidDeviceException("no valid serial device found")
if (l := len(ports)) > 1:
raise MultipleValidDevicesException(
f"too many ({l}) valid serial devices found"
)
# check each of our ports to make sure one of them is responding
port = ports[0]
# we might already have a name by now if we overrode earlier
mcu_name = mcu_name or self._get_name(port)
if not mcu_name:
raise NoWorkingDeviceException(
f"found {port}, but it did not respond with its name"
)
self.port = port
self.mcu_name = mcu_name
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(b"can_relay_mode,on\n")
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
# turn relay mode off if it failed to respond with its name
serial_interface.write(b"can_relay_mode,off\n")
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> tuple[VicCAN | None, str | None]:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return (None, None)
return (
self.string_to_viccan(raw, self.mcu_name),
raw,
)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
return (None, None) # pretty much no other error matters
def write(self, msg: VicCAN):
self.write_raw(viccan_to_string(msg))
def write_raw(self, msg: str):
self.serial_interface.write(bytes(msg, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger, clock: Clock, can_override: str):
super().__init__(logger, clock)
self.can_channel: str | None = None
self.can_bus: can.BusABC | None = None
avail = can.interfaces.socketcan.SocketcanBus._detect_available_configs()
if len(avail) == 0:
raise NoValidDeviceException("no CAN interfaces found")
# filter to busses whose channel matches the can_override
if can_override:
self.logger.info(f"overrode can interface with {can_override}")
avail = list(
filter(
lambda b: b.get("channel") == can_override,
avail,
)
)
if (l := len(avail)) > 1:
channels = ", ".join(str(b.get("channel")) for b in avail)
raise MultipleValidDevicesException(
f"too many ({l}) CAN interfaces found: [{channels}]"
)
bus = avail[0]
self.can_channel = str(bus.get("channel"))
self.logger.info(f"found CAN interface '{self.can_channel}'")
try:
self.can_bus = can.Bus(
interface="socketcan",
channel=self.can_channel,
bitrate=1_000_000,
)
except can.CanError as e:
raise NoWorkingDeviceException(
f"could not open CAN channel '{self.can_channel}': {e}"
)
if self.can_channel and self.can_channel.startswith("v"):
self.logger.warn("CAN interface is likely virtual")
def read(self) -> tuple[VicCAN | None, str | None]:
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
try:
message = self.can_bus.recv(timeout=0.0)
except can.CanError as e:
self.logger.error(f"CAN error while receiving: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
if message is None:
return (None, None)
arbitration_id = message.arbitration_id & 0x7FF
data_bytes = bytes(message.data)
mcu_key = (arbitration_id >> 8) & 0b111
data_type_key = (arbitration_id >> 6) & 0b11
command = arbitration_id & 0x3F
try:
mcu_name = MCU_IDS[mcu_key]
except IndexError:
self.logger.warn(
f"received CAN frame with unknown MCU key {mcu_key}; id=0x{arbitration_id:X}"
)
return (None, None)
data: list[float] = []
try:
if data_type_key == 3:
data = []
elif data_type_key == 0:
if len(data_bytes) < 8:
self.logger.warn(
f"received double payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
(value,) = struct.unpack(">d", data_bytes[:8])
data = [float(value)]
elif data_type_key == 1:
if len(data_bytes) < 8:
self.logger.warn(
f"received float32x2 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
v1, v2 = struct.unpack(">ff", data_bytes[:8])
data = [float(v1), float(v2)]
elif data_type_key == 2:
if len(data_bytes) < 8:
self.logger.warn(
f"received int16x4 payload with insufficient length {len(data_bytes)}; dropping frame"
)
return (None, None)
i1, i2, i3, i4 = struct.unpack(">hhhh", data_bytes[:8])
data = [float(i1), float(i2), float(i3), float(i4)]
else:
self.logger.warn(
f"received CAN frame with unknown data_type_key {data_type_key}; id=0x{arbitration_id:X}"
)
return (None, None)
except struct.error as e:
self.logger.error(f"error unpacking CAN payload: {e}")
return (None, None)
viccan = VicCAN(
mcu_name=mcu_name,
command_id=int(command),
data=data,
)
self.logger.debug(
f"received CAN frame id=0x{message.arbitration_id:X}, "
f"decoded as VicCAN(mcu_name={viccan.mcu_name}, command_id={viccan.command_id}, data={viccan.data})"
)
return (
viccan,
f"{viccan.mcu_name},{viccan.command_id},"
+ ",".join(map(str, list(viccan.data))),
)
def write(self, msg: VicCAN):
if not self.can_bus:
raise DeviceClosedException("CAN bus not initialized")
# build 11-bit arbitration ID according to VicCAN spec:
# bits 10..8: targeted MCU key
# bits 7..6: data type key
# bits 5..0: command
# map MCU name to 3-bit key.
try:
mcu_id = MCU_IDS.index((msg.mcu_name or "").lower())
except ValueError:
self.logger.error(
f"unknown VicCAN mcu_name '{msg.mcu_name}' for CAN frame; dropping message"
)
return
# determine data type from length:
# 0: double x1, 1: float32 x2, 2: int16 x4, 3: empty
match data_len := len(msg.data):
case 0:
data_type = 3
data = bytes()
case 1:
data_type = 0
data = struct.pack(">d", *msg.data)
case 2:
data_type = 1
data = struct.pack(">ff", *msg.data)
case 3 | 4: # 3 gets treated as 4
data_type = 2
if data_len == 3:
msg.data.append(0)
data = struct.pack(">hhhh", *[int(x) for x in msg.data])
case _:
self.logger.error(
f"unexpected VicCAN data length: {data_len}; dropping message"
)
return
# command is limited to 6 bits.
command = int(msg.command_id)
if command < 0 or command > 0x3F:
self.logger.error(
f"invalid command_id for CAN frame: {command}; dropping message"
)
return
try:
can_message = can.Message(
arbitration_id=(mcu_id << 8) | (data_type << 6) | command,
data=data,
is_extended_id=False,
)
except Exception as e:
self.logger.error(f"failed to construct CAN message: {e}")
return
try:
self.can_bus.send(can_message)
self.logger.debug(
f"sent CAN frame id=0x{can_message.arbitration_id:X}, "
f"data={list(can_message.data)}"
)
except can.CanError as e:
self.logger.error(f"CAN error while sending: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
def write_raw(self, msg: str):
self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}")
def cleanup(self):
try:
if self.can_bus is not None:
self.logger.info("shutting down CAN bus")
self.can_bus.shutdown()
except Exception as e:
self.logger.error(e)
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger, clock: Clock):
super().__init__(logger, clock)
# No hardware interface for MockConnector. Publish to `/anchor/from_vic/mock_mcu` instead.
def read(self) -> tuple[VicCAN | None, str | None]:
return (None, None)
def write(self, msg: VicCAN):
pass
def write_raw(self, msg: str):
pass

View File

@@ -0,0 +1,65 @@
from astra_msgs.msg import VicCAN
from std_msgs.msg import Header
from builtin_interfaces.msg import Time
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(
msg: str, mcu_name: str, logger: RcutilsLogger, time: Time
) -> VicCAN | None:
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed and return None.
"""
parts: list[str] = msg.strip().split(",")
# don't need an extra check because len of .split output is always >= 1
if not parts[0].startswith("can_relay_"):
logger.debug(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
logger.debug(f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}")
return None
elif len(parts) > 7:
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
try:
command_id = int(parts[2])
except ValueError:
logger.debug(
f"got garbage (non-integer command id) CAN data from {mcu_name}: {msg}"
)
return None
if command_id not in range(64):
logger.debug(
f"got garbage (wrong command id {command_id}) CAN data from {mcu_name}: {msg}"
)
return None
try:
return VicCAN(
header=Header(
stamp=time,
frame_id="from_vic",
),
mcu_name=parts[1],
command_id=command_id,
data=[float(x) for x in parts[3:]],
)
except ValueError:
logger.debug(f"got garbage (non-numerical) CAN data from {mcu_name}: {msg}")
return None
def viccan_to_string(viccan: VicCAN) -> str:
"""Converts a ROS2 VicCAN message to the serial string VicCAN format."""
# make sure we accept 3 digits and treat it as 4
if len(viccan.data) == 3:
viccan.data.append(0)
# go from [ w, x, y, z ] -> ",w,x,y,z" & round to 7 digits max
data = "".join([f",{round(val,7)}" for val in viccan.data])
return f"can_relay_tovic,{viccan.mcu_name},{viccan.command_id}{data}\n"

View File

@@ -1,138 +1,174 @@
#!/usr/bin/env python3
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown from launch.actions import DeclareLaunchArgument, Shutdown, IncludeLaunchDescription
from launch.substitutions import (
LaunchConfiguration,
ThisLaunchFileDir,
PathJoinSubstitution,
)
from launch_ros.actions import Node
from launch.conditions import IfCondition from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.launch_description_sources import PythonLaunchDescriptionSource
# Prevent making __pycache__ directories def generate_launch_description():
from sys import dont_write_bytecode connector = LaunchConfiguration("connector")
serial_override = LaunchConfiguration("serial_override")
can_override = LaunchConfiguration("can_override")
use_ptz = LaunchConfiguration("use_ptz")
dont_write_bytecode = True ld = LaunchDescription()
# arguments
ld.add_action(
DeclareLaunchArgument(
"connector",
default_value="auto",
description="Connector parameter for anchor node (default: 'auto')",
)
)
def launch_setup(context, *args, **kwargs): ld.add_action(
# Retrieve the resolved value of the launch argument 'mode' DeclareLaunchArgument(
mode = LaunchConfiguration("mode").perform(context) "serial_override",
nodes = [] default_value="",
description="Serial port override parameter for anchor node (default: '')",
)
)
if mode == "anchor": ld.add_action(
# Launch every node and pass "anchor" as the parameter DeclareLaunchArgument(
"can_override",
default_value="",
description="CAN network override parameter for anchor node (default: '')",
)
)
nodes.append( ld.add_action(
Node( DeclareLaunchArgument(
package="arm_pkg", "use_ptz",
executable="arm", # change as needed default_value="true", # must be string for launch system
name="arm", description="Whether to launch PTZ node (default: true)",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
) )
) )
nodes.append(
Node( ld.add_action(
package="core_pkg", DeclareLaunchArgument(
executable="core", # change as needed "use_ros2_control",
name="core", default_value="false",
output="both", description="Whether to use DiffDriveController for driving instead of direct Twist",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
) )
) )
nodes.append(
Node( ld.add_action(
package="core_pkg", DeclareLaunchArgument(
executable="ptz", # change as needed "rover_platform_override",
name="ptz", default_value="",
output="both", description="Override the rover platform (either clucky or testbed). If unset, hostname is used; defaults to clucky without hostname.",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")), choices=["clucky", "testbed", ""],
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )
) )
nodes.append(
Node( # nodes
package="bio_pkg", ld.add_action(
executable="bio", # change as needed
name="bio",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
nodes.append(
Node( Node(
package="anchor_pkg", package="anchor_pkg",
executable="anchor", # change as needed executable="anchor",
name="anchor", name="anchor",
output="both", output="both",
parameters=[{"launch_mode": mode}], parameters=[
{
"launch_mode": "anchor",
"connector": connector,
"serial_override": serial_override,
"can_override": can_override,
}
],
on_exit=Shutdown(), on_exit=Shutdown(),
) )
) )
elif mode in ["arm", "core", "bio", "ptz"]:
# Only launch the node corresponding to the provided mode. ld.add_action(
if mode == "arm":
nodes.append(
Node( Node(
package="arm_pkg", package="arm_pkg",
executable="arm", executable="arm",
name="arm", name="arm",
output="both", output="both",
parameters=[{"launch_mode": mode}], parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(), on_exit=Shutdown(),
) )
) )
elif mode == "core":
nodes.append( ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": mode}],
on_exit=Shutdown(),
)
)
elif mode == "bio":
nodes.append(
Node( Node(
package="bio_pkg", package="bio_pkg",
executable="bio", executable="bio",
name="bio", name="bio",
output="both", output="both",
parameters=[{"launch_mode": mode}], parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(), on_exit=Shutdown(),
) )
) )
elif mode == "ptz":
nodes.append( ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[
{"launch_mode": "anchor"},
{
"use_ros2_control": LaunchConfiguration(
"use_ros2_control", default=False
)
},
{
"rover_platform_override": LaunchConfiguration(
"rover_platform_override", default="auto"
)
},
],
on_exit=Shutdown(),
)
)
ld.add_action(
Node( Node(
package="core_pkg", package="core_pkg",
executable="ptz", executable="ptz",
name="ptz", name="ptz",
output="both", output="both",
on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched condition=IfCondition(use_ptz),
) )
) )
else:
# If an invalid mode is provided, print an error.
print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.")
return nodes ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
def generate_launch_description(): PathJoinSubstitution(
declare_arg = DeclareLaunchArgument( [
"mode", FindPackageShare("core_description"),
default_value="anchor", "launch",
description="Launch mode: arm, core, bio, anchor, or ptz", "robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
) )
return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)]) ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
return ld

View File

@@ -3,13 +3,20 @@
<package format="3"> <package format="3">
<name>anchor_pkg</name> <name>anchor_pkg</name>
<version>0.0.0</version> <version>0.0.0</version>
<description>TODO: Package description</description> <description>ASTRA VicCAN driver package, using python-can and pyserial.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer> <maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend> <exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-serial</exec_depend>
<exec_depend>python3-can</exec_depend>
<exec_depend>core_pkg</exec_depend>
<exec_depend>arm_pkg</exec_depend>
<exec_depend>bio_pkg</exec_depend>
<exec_depend>core_description</exec_depend>
<build_depend>black</build_depend> <build_depend>black</build_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/anchor_pkg script_dir=$base/lib/anchor_pkg
[install] [install]
install_scripts=$base/lib/anchor_pkg install_scripts=$base/lib/anchor_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -6,7 +6,7 @@ package_name = "anchor_pkg"
setup( setup(
name=package_name, name=package_name,
version="0.0.0", version="1.0.0",
packages=find_packages(exclude=["test"]), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -17,8 +17,8 @@ setup(
zip_safe=True, zip_safe=True,
maintainer="tristan", maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com", maintainer_email="tristanmcginnis26@gmail.com",
description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN", description="ASTRA VicCAN driver package, using python-can and pyserial.",
license="All Rights Reserved", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": ["anchor = anchor_pkg.anchor_node:main"], "console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
}, },

View File

@@ -1,279 +0,0 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
from std_msgs.msg import String
from astra_msgs.msg import ControllerState
from astra_msgs.msg import ArmManual
from astra_msgs.msg import ArmIK
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("arm_headless")
# Depricated, kept temporarily for reference
# self.create_timer(0.20, self.send_controls)#read and send controls
self.create_timer(0.1, self.send_manual)
# Create a publisher to publish any output the pico sends
# Depricated, kept temporarily for reference
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
# Create a subscriber to listen to any commands sent for the pico
# Depricated, kept temporarily for reference
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
# Check the pico for updates
# self.read_feedback()
if (
pygame.joystick.get_count() == 0
): # if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
# self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
# self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() # re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_manual(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = ArmManual()
# Triggers for gripper control
if self.gamepad.get_axis(2) > 0: # left trigger
input.gripper = -1
elif self.gamepad.get_axis(5) > 0: # right trigger
input.gripper = 1
# Toggle Laser
if self.gamepad.get_button(7): # Start
self.laser_status = 1
elif self.gamepad.get_button(6): # Back
self.laser_status = 0
input.laser = self.laser_status
if self.gamepad.get_button(5): # right bumper, control effector
# Left stick X-axis for effector yaw
if self.gamepad.get_axis(0) > 0:
input.effector_yaw = 1
elif self.gamepad.get_axis(0) < 0:
input.effector_yaw = -1
# Right stick X-axis for effector roll
if self.gamepad.get_axis(3) > 0:
input.effector_roll = 1
elif self.gamepad.get_axis(3) < 0:
input.effector_roll = -1
else: # Control arm axis
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
input.axis1 = round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
input.axis3 = -1 * round(self.gamepad.get_axis(4))
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
# Button Mappings
# axis2 -> LT
# axis5 -> RT
# Buttons0 -> A
# Buttons1 -> B
# Buttons2 -> X
# Buttons3 -> Y
# Buttons4 -> LB
# Buttons5 -> RB
# Buttons6 -> Back
# Buttons7 -> Start
input.linear_actuator = 0
if pygame.joystick.get_count() != 0:
self.get_logger().info(
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
)
self.manual_pub.publish(input)
else:
pass
pass
# Depricated, kept temporarily for reference
# def send_controls(self):
# for event in pygame.event.get():
# if event.type == pygame.QUIT:
# pygame.quit()
# exit()
# input = ControllerState()
# input.lt = self.gamepad.get_axis(2)#left trigger
# input.rt = self.gamepad.get_axis(5)#right trigger
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
# if(self.gamepad.get_button(4)):#left bumper
# input.lb = True
# else:
# input.lb = False
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
# if(self.gamepad.get_button(5)):#right bumper
# input.rb = True
# else:
# input.rb = False
# #input.plus = self.gamepad.get_button(6)#plus button
# if(self.gamepad.get_button(7)):#plus button
# input.plus = True
# else:
# input.plus = False
# #input.minus = self.gamepad.get_button(4)#minus button
# if(self.gamepad.get_button(6)):#minus button
# input.minus = True
# else:
# input.minus = False
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
# #input.a = self.gamepad.get_button(1)#A button
# if(self.gamepad.get_button(0)):#A button
# input.a = True
# else:
# input.a = False
# #input.b = self.gamepad.get_button(0)#B button
# if(self.gamepad.get_button(1)):#B button
# input.b = True
# else:
# input.b = False
# #input.x = self.gamepad.get_button(3)#X button
# if(self.gamepad.get_button(2)):#X button
# input.x = True
# else:
# input.x = False
# #input.y = self.gamepad.get_button(2)#Y button
# if(self.gamepad.get_button(3)):#Y button
# input.y = True
# else:
# input.y = False
# dpad_input = self.gamepad.get_hat(0)#D-pad input
# #not using up/down on DPad
# input.d_up = False
# input.d_down = False
# if(dpad_input[0] == 1):#D-pad right
# input.d_right = True
# else:
# input.d_right = False
# if(dpad_input[0] == -1):#D-pad left
# input.d_left = True
# else:
# input.d_left = False
# if pygame.joystick.get_count() != 0:
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
# self.publisher.publish(input)
# else:
# pass
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
# tb_bs = BaseStation()
# node.run()
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,7 @@
import sys import sys
import threading
import signal import signal
import math import math
from warnings import deprecated
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
@@ -11,8 +11,8 @@ from rclpy import qos
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -25,8 +25,6 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
thread = None
class ArmNode(Node): class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics.""" """Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
@@ -68,7 +66,9 @@ class ArmNode(Node):
# Parameters # Parameters
self.declare_parameter("use_old_topics", True) self.declare_parameter("use_old_topics", True)
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value self.use_old_topics = (
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
################################################## ##################################################
# Old topics # Old topics
@@ -85,7 +85,9 @@ class ArmNode(Node):
SocketFeedback, "/arm/feedback/socket", 10 SocketFeedback, "/arm/feedback/socket", 10
) )
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10) self.digit_pub = self.create_publisher(
DigitFeedback, "/arm/feedback/digit", 10
)
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback) self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
@@ -110,13 +112,26 @@ class ArmNode(Node):
# Control # Control
# Manual: /arm/manual_new is published by Servo or Basestation # Manual: /arm/control/joint_jog is published by Basestation or Headless
self.jointjog_pub_ = self.create_subscription( self.man_jointjog_sub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos JointJog,
"/arm/control/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
) )
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control # IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription( self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos JointState,
"/joint_commands",
self.joint_command_callback,
qos_profile=control_qos,
)
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
) )
# Feedback # Feedback
@@ -124,12 +139,12 @@ class ArmNode(Node):
# Combined Socket and Digit feedback # Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher( self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback, ArmFeedback,
"/arm/feedback/new_feedback", "/arm/feedback",
qos_profile=qos.qos_profile_sensor_data, qos_profile=qos.qos_profile_sensor_data,
) )
# IK arm pose: /joint_states is published from here to topic_based_control # IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher( self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
) )
################################################### ###################################################
@@ -137,21 +152,23 @@ class ArmNode(Node):
# Combined Socket and Digit feedback # Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback() self.arm_feedback_new = ArmFeedback()
self.arm_feedback_new.axis0_motor.id = 1
self.arm_feedback_new.axis1_motor.id = 2
self.arm_feedback_new.axis2_motor.id = 3
self.arm_feedback_new.axis3_motor.id = 4
# IK Arm pose # IK Arm pose
self.saved_joint_state = JointState() self.saved_joint_state = JointState()
self.saved_joint_state.header.frame_id = "base_link"
self.saved_joint_state.name = self.all_joint_names self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros # ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name) self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name) self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog): def jointjog_callback(self, msg: JointJog):
if ( if len(msg.joint_names) != len(msg.velocities):
len(msg.joint_names) == 0 self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
or len(msg.velocities) == 0 return
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message # Grab velocities from message
velocities = [ velocities = [
@@ -164,62 +181,95 @@ class ArmNode(Node):
] ]
# Deadzone # Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities] velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=39,
data=velocities[0:4],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=39,
data=velocities[4:6],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=26,
data=[velocities[6]],
header=msg.header,
)
)
# TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
return # command needs either position or velocity for all 7 joints
# Grab velocities from message
velocities = [
(
msg.velocity[msg.name.index(joint_name)] # type: ignore
if joint_name in msg.name
else 0.0
)
for joint_name in self.all_joint_names
]
self.send_velocities(velocities, msg.header)
def send_velocities(self, velocities: list[float], header: Header):
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s # ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [ velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities) math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
] ]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3 # Send Axis 0-3
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
VicCAN( VicCAN(mcu_name="arm", command_id=43, data=velocities[0:4], header=header)
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
) )
# Send Wrist yaw and roll # Send Wrist yaw and roll
# TODO: Verify embedded # TODO: Verify embedded
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
VicCAN( VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6], header=header)
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
) )
# Send End Effector Gripper # Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded # TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
VicCAN( VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header)
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
) )
)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
# TODO: refactor to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians
velocities = [
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Axis 0-3
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3])
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg())
# Wrist yaw and roll, gripper included for future use when have adequate hardware
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6])
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg())
self.anchor_tovic_pub_.publish(arm_cmd)
self.anchor_tovic_pub_.publish(digit_cmd)
@deprecated("Uses an old message type. Will be removed at some point.")
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
"""TODO: Old"""
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
@@ -243,13 +293,13 @@ class ArmNode(Node):
return return
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
"""TODO: Old"""
output = String(data=msg) output = String(data=msg)
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
"""TODO: Old"""
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
@@ -282,8 +332,7 @@ class ArmNode(Node):
self.process_fromvic_digit(msg) self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN): def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm": assert msg.mcu_name == "arm"
return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_socket_msg_len_dict: if msg.command_id in self.viccan_socket_msg_len_dict:
@@ -294,6 +343,8 @@ class ArmNode(Node):
) )
return return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id: match msg.command_id:
case 53: # REV SPARK MAX feedback case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0]) motorId = round(msg.data[0])
@@ -326,12 +377,8 @@ class ArmNode(Node):
# Joint state publisher for URDF visualization # Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0 self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1 self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians( self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
-angles[2] self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
# Wrist is handled by digit feedback # Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
@@ -363,18 +410,17 @@ class ArmNode(Node):
velocities[1] velocities[1]
) # Axis 1 ) # Axis 1
self.saved_joint_state.velocity[2] = math.radians( self.saved_joint_state.velocity[2] = math.radians(
-velocities[2] velocities[2]
) # Axis 2 (-) ) # Axis 2
self.saved_joint_state.velocity[3] = math.radians( self.saved_joint_state.velocity[3] = math.radians(
-velocities[3] velocities[3]
) # Axis 3 (-) ) # Axis 3
# Wrist is handled by digit feedback # Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state) self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN): def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit": assert msg.mcu_name == "digit"
return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_digit_msg_len_dict: if msg.command_id in self.viccan_digit_msg_len_dict:
@@ -385,11 +431,14 @@ class ArmNode(Node):
) )
return return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id: match msg.command_id:
case 54: # Board voltages case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0 self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0 self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0 self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.digit_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians( self.saved_joint_state.position[4] = math.radians(
msg.data[0] msg.data[0]
@@ -398,13 +447,13 @@ class ArmNode(Node):
msg.data[1] msg.data[1]
) # Wrist yaw ) # Wrist yaw
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self): def publish_feedback(self):
"""TODO: Old"""
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
@deprecated("Uses an old message type. Will be removed at some point.")
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: str):
"""TODO: Old"""
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas # split the msg.data by commas
parts = msg.split(",") parts = msg.split(",")
@@ -422,8 +471,8 @@ class ArmNode(Node):
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateBusVoltage(self, msg: str): def updateBusVoltage(self, msg: str):
"""TODO: Old"""
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -437,8 +486,8 @@ class ArmNode(Node):
else: else:
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateMotorFeedback(self, msg: str): def updateMotorFeedback(self, msg: str):
"""TODO: Old"""
parts = str(msg.strip()).split(",") parts = str(msg.strip()).split(",")
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0 temp = float(parts[4]) / 10.0
@@ -462,15 +511,22 @@ class ArmNode(Node):
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
arm_node = ArmNode() arm_node = ArmNode()
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
thread.start()
try: try:
thread.join() rclpy.spin(arm_node)
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
pass pass
finally: finally:
@@ -478,8 +534,4 @@ def main(args=None):
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -3,14 +3,15 @@
<package format="3"> <package format="3">
<name>arm_pkg</name> <name>arm_pkg</name>
<version>1.0.0</version> <version>1.0.0</version>
<description>Core arm package which handles ROS2 commnuication.</description> <description>Relays topics related to Arm between VicCAN (through Anchor) and basestation.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer> <maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-numpy</depend> <exec_depend>common_interfaces</exec_depend>
<depend>astra_msgs</depend> <exec_depend>python3-numpy</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>

View File

@@ -1,27 +1,22 @@
from setuptools import find_packages, setup from setuptools import setup
import os
from glob import glob
package_name = "arm_pkg" package_name = "arm_pkg"
setup( setup(
name=package_name, name=package_name,
version="1.0.0", version="1.0.0",
packages=find_packages(exclude=["test"]), packages=[package_name],
data_files=[ data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]), ("share/" + package_name, ["package.xml"]),
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer="tristan", maintainer="David Sharpe",
maintainer_email="tristanmcginnis26@gmail.com", maintainer_email="ds0196@uah.edu",
description="TODO: Package description", description="Relays topics related to Arm between VicCAN (through Anchor) and basestation.",
license="All Rights Reserved", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": [ "console_scripts": ["arm = arm_pkg.arm_node:main"],
"arm = arm_pkg.arm_node:main",
"headless = arm_pkg.arm_headless:main",
],
}, },
) )

View File

@@ -2,14 +2,16 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3"> <package format="3">
<name>bio_pkg</name> <name>bio_pkg</name>
<version>0.0.0</version> <version>1.0.0</version>
<description>TODO: Package description</description> <description>Biosensor package to handle command interpretation and embedded interfacing.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer> <maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>astra_msgs</depend> <exec_depend>common_interfaces</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/bio_pkg script_dir=$base/lib/bio_pkg
[install] [install]
install_scripts=$base/lib/bio_pkg install_scripts=$base/lib/bio_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,8 +14,8 @@ setup(
zip_safe=True, zip_safe=True,
maintainer="tristan", maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com", maintainer_email="tristanmcginnis26@gmail.com",
description="TODO: Package description", description="Relays topics related to Biosensor between VicCAN (through Anchor) and basestation.",
license="TODO: License declaration", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": ["bio = bio_pkg.bio_node:main"], "console_scripts": ["bio = bio_pkg.bio_node:main"],
}, },

View File

@@ -1,112 +0,0 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
import importlib
from std_msgs.msg import String
from astra_msgs.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
max_speed = 90 # Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, "/core/control", 10)
self.lastMsg = (
String()
) # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}"
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -1,20 +1,17 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
from rclpy.duration import Duration
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import os
import sys import sys
import threading import signal
import glob from typing import Literal, cast
from scipy.spatial.transform import Rotation from scipy.spatial.transform import Rotation
from math import copysign, pi from math import copysign, pi
from warnings import deprecated
from os import getenv
from socket import gethostname
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState
@@ -23,12 +20,14 @@ from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState
from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState
serial_pub = None
thread = None
CORE_WHEELBASE = 0.836 # meters CORE_WHEELBASE = 0.836 # meters
CORE_WHEEL_RADIUS = 0.171 # meters CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 CORE_GEAR_RATIO = 100.0 # Clucky: 100:1
# TODO: update core_description or add testbed_description
TESTBED_WHEELBASE = 0.368 # meters
TESTBED_WHEEL_RADIUS = 0.108 # meters
TESTBED_GEAR_RATIO = 64 # Testbed: 64:1
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -41,6 +40,10 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
class CoreNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Core related topics."""
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list # Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = { viccan_msg_len_dict = {
@@ -54,23 +57,79 @@ viccan_msg_len_dict = {
56: 4, # really 3, but viccan 56: 4, # really 3, but viccan
58: 4, # ditto 58: 4, # ditto
} }
rover_platform: Literal["clucky", "testbed"]
class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name
super().__init__("core_node") super().__init__("core_node")
# Launch mode -- anchor vs core self.get_logger().info(f"core launch_mode is: anchor")
self.declare_parameter("launch_mode", "core")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
################################################## ##################################################
# Topics # Parameters
self.declare_parameter("use_ros2_control", False)
self.use_ros2_control = (
self.get_parameter("use_ros2_control").get_parameter_value().bool_value
)
self.declare_parameter("rover_platform_override", "")
rover_platform = (
self.get_parameter("rover_platform_override")
.get_parameter_value()
.string_value
)
# Verify rover_platform_override value is valid
if rover_platform not in ("clucky", "testbed", ""):
# Keeping this here, because I want a value error only if the user manually
# overrides using a bad value. If we can't determine it automatically
# from hostname, then default to clucky.
self.get_logger().fatal(
"Invalid rover_platform_override parameter value. If set, must be 'clucky' or 'testbed'."
)
# Attempt to determine platform from hostname, default to clucky on failure
rover_platform = rover_platform or gethostname().lower()
if rover_platform not in ("clucky", "testbed"):
self.get_logger().info(
"rover_platform defaulting to clucky, not overridden and could not determine from hostname."
)
rover_platform = "clucky"
self.rover_platform = cast(Literal["clucky", "testbed"], rover_platform)
if self.rover_platform == "testbed":
global TESTBED_WHEELBASE, TESTBED_WHEEL_RADIUS, TESTBED_GEAR_RATIO
self.wheelbase = TESTBED_WHEELBASE
self.wheel_radius = TESTBED_WHEEL_RADIUS
self.gear_ratio = TESTBED_GEAR_RATIO
else: # default in case of unset or invalid environment variable
global CORE_WHEELBASE, CORE_WHEEL_RADIUS, CORE_GEAR_RATIO
self.wheelbase = CORE_WHEELBASE
self.wheel_radius = CORE_WHEEL_RADIUS
self.gear_ratio = CORE_GEAR_RATIO
##################################################
# Old Topics
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
if not self.use_ros2_control:
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
##################################################
# New Topics
# Anchor # Anchor
if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription( self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20 VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20
) )
@@ -78,18 +137,16 @@ class SerialRelay(Node):
VicCAN, "/anchor/to_vic/relay", 20 VicCAN, "/anchor/to_vic/relay", 20
) )
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control # Control
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 if self.use_ros2_control:
self.cmd_vel_sub_ = self.create_subscription( # Joint state control for topic-based controller
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1 self.joint_command_sub_ = self.create_subscription(
JointState, "/core/joint_commands", self.joint_command_callback, 2
) )
else:
# manual twist -- [-1, 1] rather than real units # manual twist -- [-1, 1] rather than real units
# TODO: change topic to '/core/control/twist'
self.twist_man_sub_ = self.create_subscription( self.twist_man_sub_ = self.create_subscription(
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
) )
@@ -100,148 +157,74 @@ class SerialRelay(Node):
self.control_state_callback, self.control_state_callback,
qos_profile=control_qos, qos_profile=control_qos,
) )
self.twist_max_duty = ( self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
)
# Feedback # Feedback
# Consolidated and organized core feedback # Consolidated and organized main core feedback
# TODO: change topic to something like '/core/feedback/main'
self.feedback_new_pub_ = self.create_publisher( self.feedback_new_pub_ = self.create_publisher(
NewCoreFeedback, NewCoreFeedback,
"/core/feedback_new", "/core/feedback_new",
qos_profile=qos.qos_profile_sensor_data, qos_profile=qos.qos_profile_sensor_data,
) )
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/feedback/baro", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Timers
if self.use_ros2_control:
self.vel_cmd_timer_ = self.create_timer(0.1, self.vel_cmd_timer_callback)
###################################################
# Saved state
# Controls
self._last_joint_command_time = self.get_clock().now()
self._last_joint_command_msg = JointState()
# Main Core feedback
self.feedback_new_state = NewCoreFeedback() self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1 self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2 self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3 self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4 self.feedback_new_state.br_motor.id = 4
self.telemetry_pub_timer = self.create_timer(
1.0, self.publish_feedback # IMU
) # TODO: not sure about this
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
self.imu_state = Imu() self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055" self.imu_state.header.frame_id = "core_bno055"
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher( # GPS
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
self.gps_state = NavSatFix() self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna" self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher( # Barometer
Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data
)
self.baro_state = Barometer() self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388" self.baro_state.header.frame_id = "core_bmp388"
# Old @deprecated("Uses an old message type. Will be removed at some point.")
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
# Debug
self.debug_pub = self.create_publisher(String, "/core/debug", 10)
self.ping_service = self.create_service(
Empty, "/astra/core/ping", self.ping_callback
)
##################################################
# Find microcontroller (Non-anchor only)
# Core (non-anchor) specific
if self.launch_mode == "core":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
self.get_logger().info(f"Enabling Relay Mode")
ser.write(b"can_relay_mode,on\n")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
# end __init__()
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
if self.launch_mode == "core":
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): # NON-ANCHOR SPECIFIC
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
print(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
def scale_duty(self, value: float, max_speed: float): def scale_duty(self, value: float, max_speed: float):
leftMin = -1 leftMin = -1
leftMax = 1 leftMax = 1
@@ -258,6 +241,7 @@ class SerialRelay(Node):
# Convert the 0-1 range into a value in the right range. # Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan)) return str(rightMin + (valueScaled * rightSpan))
@deprecated("Uses an old message type. Will be removed at some point.")
def send_controls(self, msg: CoreControl): def send_controls(self, msg: CoreControl):
if msg.turn_to_enable: if msg.turn_to_enable:
command = ( command = (
@@ -283,17 +267,57 @@ class SerialRelay(Node):
# print(f"[Sys] Relaying: {command}") # print(f"[Sys] Relaying: {command}")
def cmd_vel_callback(self, msg: TwistStamped): def joint_command_callback(self, msg: JointState):
linear = msg.twist.linear.x # So... topic based control node publishes JointState messages over /joint_commands
angular = -msg.twist.angular.z # with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints
# are included in msg.name, but ig it is implied that msg.velocity only
# includes velocities for the commanded joints (ros__parameters.joints).
# So, this will be much more hacky and less adaptable than I would like it to be.
if (
len(msg.name) != (4 if self.rover_platform == "testbed" else 5)
or len(msg.velocity) != 4
or len(msg.position) != 0
):
self.get_logger().warning(
f"Received joint control message with unexpected number of joints. Ignoring."
)
return
if msg.name[-4:] != [ # type: ignore
"bl_wheel_joint",
"br_wheel_joint",
"fl_wheel_joint",
"fr_wheel_joint",
]:
self.get_logger().warning(
f"Received joint control message with unexpected name[]. Ignoring."
)
return
vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS # A 10 Hz timer callback actually sends these commands to Core for rate limiting
vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS # These come from ros2_control at 50 Hz
self._last_joint_command_time = self.get_clock().now()
self._last_joint_command_msg = msg
vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO def vel_cmd_timer_callback(self):
vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO # Safety timeout for diff_drive_controller commands via topic_based_ros2_control.
# It is safe to send stop command here because if self.use_ros2_control,
# then this is the only callback that is controlling Core's motors.
if self.get_clock().now() - self._last_joint_command_time > Duration(
nanoseconds=int(1e8) # 100ms
):
self.send_viccan(20, [0.0, 0.0, 0.0, 0.0])
return
self.send_viccan(20, [vel_left_rpm, vel_right_rpm]) # This order is verified by the subscription callback
(bl_vel, br_vel, fl_vel, fr_vel) = self._last_joint_command_msg.velocity
# Convert wheel rad/s to motor RPM
bl_rpm = radps_to_rpm(bl_vel) * self.gear_ratio
br_rpm = radps_to_rpm(br_vel) * self.gear_ratio
fl_rpm = radps_to_rpm(fl_vel) * self.gear_ratio
fr_rpm = radps_to_rpm(fr_vel) * self.gear_ratio
self.send_viccan(20, [fl_rpm, bl_rpm, fr_rpm, br_rpm]) # REV Velocity
def twist_man_callback(self, msg: Twist): def twist_man_callback(self, msg: Twist):
linear = msg.linear.x # [-1 1] for forward/back from left stick y linear = msg.linear.x # [-1 1] for forward/back from left stick y
@@ -334,15 +358,9 @@ class SerialRelay(Node):
# Max duty cycle # Max duty cycle
self.twist_max_duty = msg.max_duty # twist_man_callback will handle this self.twist_max_duty = msg.max_duty # twist_man_callback will handle this
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if self.launch_mode == "anchor": self.anchor_pub.publish(String(data=msg)) # Publish to anchor for relay
# self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String() # Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "core":
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def send_viccan(self, cmd_id: int, data: list[float]): def send_viccan(self, cmd_id: int, data: list[float]):
self.anchor_tovic_pub_.publish( self.anchor_tovic_pub_.publish(
@@ -354,6 +372,7 @@ class SerialRelay(Node):
) )
) )
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
@@ -423,14 +442,16 @@ class SerialRelay(Node):
# skill diff if not # skill diff if not
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_msg_len_dict: if msg.command_id in self.viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id] expected_len = self.viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
) )
return return
self.feedback_new_state.header.stamp = msg.header.stamp
match msg.command_id: match msg.command_id:
# GNSS # GNSS
case 48: # GNSS Latitude case 48: # GNSS Latitude
@@ -457,6 +478,7 @@ class SerialRelay(Node):
self.imu_state.linear_acceleration.x = float(msg.data[0]) self.imu_state.linear_acceleration.x = float(msg.data[0])
self.imu_state.linear_acceleration.y = float(msg.data[1]) self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2]) self.imu_state.linear_acceleration.z = float(msg.data[2])
self.feedback_new_state.orientation = float(msg.data[3])
# Deal with quaternion # Deal with quaternion
r = Rotation.from_euler("z", float(msg.data[3]), degrees=True) r = Rotation.from_euler("z", float(msg.data[3]), degrees=True)
q = r.as_quat() q = r.as_quat()
@@ -501,6 +523,7 @@ class SerialRelay(Node):
self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0 self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0
self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0 self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0
self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0 self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0
self.feedback_new_state.board_voltage.header.stamp = msg.header.stamp
# Baro # Baro
case 56: # BMP temperature, altitude, pressure case 56: # BMP temperature, altitude, pressure
self.baro_state.temperature = float(msg.data[0]) self.baro_state.temperature = float(msg.data[0])
@@ -513,14 +536,12 @@ class SerialRelay(Node):
motorId = round(float(msg.data[0])) motorId = round(float(msg.data[0]))
position = float(msg.data[1]) position = float(msg.data[1])
velocity = float(msg.data[2]) velocity = float(msg.data[2])
joint_state_msg = ( joint_state_msg = JointState()
JointState()
) # TODO: not sure if all motors should be in each message or not
joint_state_msg.position = [ joint_state_msg.position = [
position * (2 * pi) / CORE_GEAR_RATIO position * (2 * pi) / self.gear_ratio
] # revolutions to radians ] # revolutions to radians
joint_state_msg.velocity = [ joint_state_msg.velocity = [
velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO velocity * (2 * pi / 60.0) / self.gear_ratio
] # RPM to rad/s ] # RPM to rad/s
motor: RevMotorState | None = None motor: RevMotorState | None = None
@@ -528,52 +549,42 @@ class SerialRelay(Node):
match motorId: match motorId:
case 1: case 1:
motor = self.feedback_new_state.fl_motor motor = self.feedback_new_state.fl_motor
joint_state_msg.name = ["fl_motor_joint"] joint_state_msg.name = ["fl_wheel_joint"]
case 2: case 2:
motor = self.feedback_new_state.bl_motor motor = self.feedback_new_state.bl_motor
joint_state_msg.name = ["bl_motor_joint"] joint_state_msg.name = ["bl_wheel_joint"]
case 3: case 3:
motor = self.feedback_new_state.fr_motor motor = self.feedback_new_state.fr_motor
joint_state_msg.name = ["fr_motor_joint"] joint_state_msg.name = ["fr_wheel_joint"]
case 4: case 4:
motor = self.feedback_new_state.br_motor motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_motor_joint"] joint_state_msg.name = ["br_wheel_joint"]
case _: case _:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}" f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
) )
return return
if motor:
motor.position = position
motor.velocity = velocity
# make the fucking shit work
if self.rover_platform == "clucky":
joint_state_msg.name.append("left_suspension_joint")
joint_state_msg.position.append(0.0)
joint_state_msg.velocity.append(0.0)
joint_state_msg.header.stamp = msg.header.stamp joint_state_msg.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(joint_state_msg) self.joint_state_pub_.publish(joint_state_msg)
case _: case _:
return return
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self): def publish_feedback(self):
# self.get_logger().info(f"[Core] {self.core_feedback}") # self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback) self.feedback_pub.publish(self.core_feedback)
def ping_callback(self, request, response):
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def map_range( def map_range(
value: float, in_min: float, in_max: float, out_min: float, out_max: float value: float, in_min: float, in_max: float, out_min: float, out_max: float
@@ -581,19 +592,31 @@ def map_range(
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def radps_to_rpm(radps: float):
return radps * 60 / (2 * pi)
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub # Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
serial_pub = SerialRelay() core_node = CoreNode()
serial_pub.run()
try:
rclpy.spin(core_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -262,7 +262,7 @@ class PtzNode(Node):
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
) )
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
self.get_logger().info(message_text) self.get_logger().debug(message_text)
def run_async_func(self, coro): def run_async_func(self, coro):
"""Run an async function in the event loop.""" """Run an async function in the event loop."""

View File

@@ -3,15 +3,17 @@
<package format="3"> <package format="3">
<name>core_pkg</name> <name>core_pkg</name>
<version>1.0.0</version> <version>1.0.0</version>
<description>Core rover control package to handle command interpretation and embedded interfacing.</description> <description>Relays topics related to Core between VicCAN (through Anchor) and basestation.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer> <maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-scipy</depend> <exec_depend>common_interfaces</exec_depend>
<depend>python-crccheck-pip</depend> <exec_depend>python3-scipy</exec_depend>
<depend>astra_msgs</depend> <exec_depend>python-crccheck-pip</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/core_pkg script_dir=$base/lib/core_pkg
[install] [install]
install_scripts=$base/lib/core_pkg install_scripts=$base/lib/core_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -4,7 +4,7 @@ package_name = "core_pkg"
setup( setup(
name=package_name, name=package_name,
version="0.0.0", version="1.0.0",
packages=find_packages(exclude=["test"]), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -12,14 +12,13 @@ setup(
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer="tristan", maintainer="David Sharpe",
maintainer_email="tristanmcginnis26@gmail.com", maintainer_email="ds0196@uah.edu",
description="Core rover control package to handle command interpretation and embedded interfacing.", description="Relays topics related to Core between VicCAN (through Anchor) and basestation.",
license="All Rights Reserved", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"core = core_pkg.core_node:main", "core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main", "ptz = core_pkg.core_ptz:main",
], ],
}, },

View File

@@ -3,14 +3,15 @@
<package format="3"> <package format="3">
<name>headless_pkg</name> <name>headless_pkg</name>
<version>1.0.0</version> <version>1.0.0</version>
<description>Headless rover control package to handle command interpretation and embedded interfacing.</description> <description>Provides headless rover control, similar to Basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer> <maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-pygame</depend> <exec_depend>common_interfaces</exec_depend>
<depend>astra_msgs</depend> <exec_depend>python3-pygame</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/headless_pkg script_dir=$base/lib/headless_pkg
[install] [install]
install_scripts=$base/lib/headless_pkg install_scripts=$base/lib/headless_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,11 +14,9 @@ setup(
zip_safe=True, zip_safe=True,
maintainer="David Sharpe", maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu", maintainer_email="ds0196@uah.edu",
description="Headless rover control package to handle command interpretation and embedded interfacing.", description="Provides headless rover control, similar to Basestation.",
license="All Rights Reserved", license="AGPL-3.0-only",
entry_points={ entry_points={
"console_scripts": [ "console_scripts": ["headless_full = src.headless_node:main"],
"headless_full = src.headless_node:main",
],
}, },
) )

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState, ArmCtrlState
import warnings import warnings
@@ -39,9 +39,9 @@ os.environ["SDL_AUDIODRIVER"] = (
CORE_STOP_MSG = CoreControl() # All zeros by default CORE_STOP_MSG = CoreControl() # All zeros by default
CORE_STOP_TWIST_MSG = Twist() # " CORE_STOP_TWIST_MSG = Twist() # "
ARM_STOP_MSG = ArmManual() # " ARM_STOP_MSG = ArmManual() # "
ARM_IK_STOP_MSG = TwistStamped() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
@@ -75,15 +75,7 @@ class Headless(Node):
# Initialize pygame first # Initialize pygame first
pygame.init() pygame.init()
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless") super().__init__("headless_node")
# TODO: move the STOP_MSGs somewhere better
global ARM_STOP_JOG_MSG
ARM_STOP_JOG_MSG = JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
################################################## ##################################################
# Preamble # Preamble
@@ -146,6 +138,11 @@ class Headless(Node):
self.get_parameter("use_old_topics").get_parameter_value().bool_value self.get_parameter("use_old_topics").get_parameter_value().bool_value
) )
self.declare_parameter("use_cmd_vel", False)
self.use_cmd_vel = (
self.get_parameter("use_cmd_vel").get_parameter_value().bool_value
)
self.declare_parameter("use_bio", False) self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
@@ -153,7 +150,6 @@ class Headless(Node):
self.use_arm_ik = ( self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value self.get_parameter("use_arm_ik").get_parameter_value().bool_value
) )
# NOTE: only applicable if use_old_topics == True # NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True) self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = ( self.use_new_arm_manual_scheme = (
@@ -163,6 +159,13 @@ class Headless(Node):
) )
# Check parameter validity # Check parameter validity
if self.use_cmd_vel:
self.get_logger().info("Using cmd_vel for core control")
global CORE_MODE
CORE_MODE = "twist"
else:
self.get_logger().info("Using astra_msgs/CoreControl for core control")
if self.use_arm_ik and self.use_old_topics: if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.") self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1) sys.exit(1)
@@ -174,6 +177,8 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
################################################## ##################################################
# Old Topics # Old Topics
@@ -192,12 +197,18 @@ class Headless(Node):
self.core_twist_pub_ = self.create_publisher( self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos Twist, "/core/twist", qos_profile=control_qos
) )
self.core_cmd_vel_pub_ = self.create_publisher(
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher( self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.arm_manual_pub_ = self.create_publisher( self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
) )
self.arm_ik_twist_publisher = self.create_publisher( self.arm_ik_twist_publisher = self.create_publisher(
@@ -239,17 +250,23 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
# Added so you can tell when it starts running after changing the constant logging to debug from info
self.get_logger().info("Defaulting to Core mode. Ready.")
def stop_all(self): def stop_all(self):
if self.use_old_topics: if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG) self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG) self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG) self.bio_publisher.publish(BIO_STOP_MSG)
else:
if self.use_cmd_vel:
self.core_cmd_vel_pub_.publish(self.core_cmd_vel_stop_msg())
else: else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG) self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik: if self.use_arm_ik:
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG) self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
else: else:
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG) self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
# TODO: add bio here after implementing new topics # TODO: add bio here after implementing new topics
def send_controls(self): def send_controls(self):
@@ -340,18 +357,32 @@ class Headless(Node):
self.core_publisher.publish(input) self.core_publisher.publish(input)
else: # New topics else: # New topics
input = Twist() twist = Twist()
# Forward/back and Turn # Forward/back and Turn
input.linear.x = -1.0 * left_stick_y twist.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign( twist.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x right_stick_x**2, right_stick_x
) # Exponent for finer control (curve) ) # Exponent for finer control (curve)
# This kinda looks dumb being seperate from the following block, but this
# maintains the separation between modifying the control message and sending it.
if self.use_cmd_vel:
# These scaling factors convert raw stick inputs to absolute m/s and rad/s
# values that DiffDriveController will convert to motor RPM, rather than
# the plain Twist, which just sends the stick values as duty cycle and
# sends that scaled to the motors.
twist.linear.x *= 1.0
twist.angular.z *= 1.5
# Publish # Publish
self.core_twist_pub_.publish(input) if self.use_cmd_vel:
self.get_logger().info( header = Header(stamp=self.get_clock().now().to_msg())
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}" self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist))
else:
self.core_twist_pub_.publish(twist)
self.get_logger().debug(
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
) )
# Brake mode # Brake mode
@@ -519,14 +550,12 @@ class Headless(Node):
# X: _ # X: _
# Y: linear actuator out # Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3 # Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float( arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
stick_to_arm_direction(right_stick_x) stick_to_arm_direction(right_stick_x)
) )
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float( arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
-1 * stick_to_arm_direction(right_stick_y) stick_to_arm_direction(right_stick_y)
) )
# Left stick: axis 1 and 2 # Left stick: axis 1 and 2
@@ -534,7 +563,7 @@ class Headless(Node):
stick_to_arm_direction(left_stick_x) stick_to_arm_direction(left_stick_x)
) )
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float( arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
-1 * stick_to_arm_direction(left_stick_y) stick_to_arm_direction(left_stick_y)
) )
# D-pad: axis 0 and _ # D-pad: axis 0 and _
@@ -562,13 +591,26 @@ class Headless(Node):
) )
# A: brake # A: brake
# TODO: Brake mode new_brake_mode = button_a
# Y: linear actuator # X: laser
# TODO: linear actuator new_laser = button_x
self.arm_manual_pub_.publish(arm_input) self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW) # IK (ONLY NEW)
# ============= # =============
@@ -653,6 +695,23 @@ class Headless(Node):
else: else:
pass # TODO: implement new bio control topics pass # TODO: implement new bio control topics
def core_cmd_vel_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
def arm_ik_twist_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float: def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""

8
treefmt.nix Normal file
View File

@@ -0,0 +1,8 @@
{ pkgs, ... }:
{
projectRootFile = "flake.nix";
programs = {
nixfmt.enable = true;
black.enable = true;
};
}