13 Commits

Author SHA1 Message Date
Riley M.
b96d75a787 Merge pull request #36 from SHC-ASTRA/serial-buffer 2026-04-20 16:06:03 -05:00
David
5239668e8e refactor(anchor): buffer bytes instead of string
Resolves Riley's comments, makes it so UTF-8 characters over 1 byte aren't destroyed
2026-04-18 17:46:59 -05:00
David
c166668415 feat(anchor): implement serial read buffering for non-blocking reads 2026-04-18 02:52:58 -05:00
ryleu
333249677f revert: fix(anchor): serial reads are now non-blocking
This reverts commit fc2ba5f8d1.

reason: making serail reads non-blocking caused serial data issues
because sometimes it would exit before actually seeing a newline
2026-04-15 11:24:19 -05:00
ryleu
fc2ba5f8d1 fix(anchor): serial reads are now non-blocking 2026-04-13 19:59:07 -05:00
ryleu
77d35949e9 docs(reset-repo): script says it works in any repo now 2026-04-13 19:57:04 -05:00
ryleu
be4fbf124d docs(anchor): fix ViCAN typo 2026-04-12 12:20:47 -05:00
ryleu
bfeae04e64 fix(anchor): messages for digit would not get sent to bio node 2026-04-12 12:20:36 -05:00
ryleu
7d80ad1ab5 fix(anchor): /anchor/to_vic/relay_string crash fixed 2026-04-12 12:20:36 -05:00
ryleu
dfabd6c330 feat(anchor): switch to SingleThreadedExecutor 2026-04-12 12:20:31 -05:00
ryleu
b892bfc631 fix(nix): switch nix-ros-overlay to main
- makes `ros2 bag record -a` work
- adds wentasah's hydra instance as an extra substitutor
2026-04-12 12:01:36 -05:00
ryleu
caf61f61a8 test: add testing script for anchor's connectors 2026-04-12 12:00:30 -05:00
ryleu
a96aa6a409 style: reformat with shfmt 2026-04-12 11:58:54 -05:00
31 changed files with 1462 additions and 954 deletions

View File

@@ -26,4 +26,3 @@ source $SCRIPT_DIR/../install/setup.bash
# Launch the ROS 2 node
ros2 run headless_pkg headless_full

View File

@@ -3,10 +3,10 @@ set -e
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
[[ -z "$ANCHOR_WS" ]] && ANCHOR_WS="$SCRIPT_DIR/.."
[[ -z "$AUTONOMY_WS" ]] && AUTONOMY_WS="$HOME/rover-Autonomy"
[[ -z $ANCHOR_WS ]] && ANCHOR_WS="$SCRIPT_DIR/.."
[[ -z $AUTONOMY_WS ]] && AUTONOMY_WS="$HOME/rover-Autonomy"
BAG_LOCATION="$HOME/bags/autostart"
[[ ! -d "$BAG_LOCATION" ]] && mkdir -p "$BAG_LOCATION"
[[ ! -d $BAG_LOCATION ]] && mkdir -p "$BAG_LOCATION"
# Wait for a network interface to be up (not necessarily online)
while ! ip link show | grep -q "state UP"; do
@@ -16,7 +16,6 @@ done
echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else

8
flake.lock generated
View File

@@ -24,16 +24,16 @@
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
"lastModified": 1775216071,
"narHash": "sha256-PrPW70Fh1uLx3JxNV/NLeXjUhgfrZmi7ac8LJOlS0q4=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a",
"rev": "197a2b55c4ed24f8b885a5b20b65f426fb6d57ca",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"ref": "master",
"repo": "nix-ros-overlay",
"type": "github"
}

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor";
inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
treefmt-nix = {
@@ -31,6 +31,8 @@
name = "ASTRA Anchor";
packages = with pkgs; [
colcon
socat
can-utils
(python313.withPackages (
p: with p; [
pyserial
@@ -96,7 +98,8 @@
);
nixConfig = {
extra-substituters = [ "https://ros.cachix.org" ];
extra-trusted-public-keys = [ "ros.cachix.org-1:dSyZxI8geDCJrwgvCOHDoAfOm5sV1wCPjBkKL+38Rvo=" ];
# Cache to pull ros packages from
extra-substituters = [ "https://ros.cachix.org" "https://attic.iid.ciirc.cvut.cz/ros" ];
extra-trusted-public-keys = [ "ros.cachix.org-1:dSyZxI8geDCJrwgvCOHDoAfOm5sV1wCPjBkKL+38Rvo=" "ros:JR95vUYsShSqfA1VTYoFt1Nz6uXasm5QrcOsGry9f6Q=" ];
};
}

38
scripts/reset-repo.bash Executable file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env bash
repo_root="$(git rev-parse --show-toplevel)"
if [[ -z $repo_root ]]; then
echo "script must be run from within a git repo" >&2
exit 1
fi
cd $repo_root
echo "this will nuke all of your current un-commited git changes, including any changes to submodules and any gitignored files. is this okay? (y/N)"
read okay
if [[ $okay != "y" ]]; then
echo "you didn't say exactly 'y'. aborting." >&2
exit 2
fi
echo
echo "ok say goodbye to everything in this repo"
git submodule deinit --all -f && echo "- submodules gone"
git clean -fdx && echo "- gitignored changes gone"
git add -A
git reset HEAD --hard && echo "- everything else gone"
git submodule update --init --recursive && echo "- brought the submodules back"
echo
echo "in theory that should've done it. let's make sure"
status=$(git status --porcelain)
echo $status
if [[ -z $status ]]; then
echo "nice, all clean!"
else
echo "uhh that's not supposed to be there. this is probably a bug in this script. good luck!" >&2
exit 3
fi

463
scripts/test-connectors.bash Executable file
View File

@@ -0,0 +1,463 @@
#!/usr/bin/env bash
# test script for anchor connectors (mock, serial, CAN)
set -o pipefail
repo_root="$(git rev-parse --show-toplevel)"
if [[ -z $repo_root ]]; then
echo "script must be run from within the rover-ros2 repo" >&2
exit 1
fi
cd "$repo_root"
# colors
BOLD='\033[1m'
RED='\033[1;31m'
GREEN='\033[1;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
TESTS_PASSED=0
TESTS_FAILED=0
log() {
echo -e "${BOLD}${YELLOW}info:${NC} ${1}"
}
pass() {
echo -e "${BOLD}${GREEN}pass:${NC} ${1}"
TESTS_PASSED=$((TESTS_PASSED + 1))
}
fail() {
echo -e "${BOLD}${RED}fail:${NC} ${1}"
TESTS_FAILED=$((TESTS_FAILED + 1))
}
cleanup() {
log "cleaning up"
if [[ -n $ANCHOR_PID ]]; then
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
wait "$ANCHOR_PID" 2>/dev/null || true
fi
if [[ -n $SOCAT_PID ]]; then
kill -INT "$SOCAT_PID" 2>/dev/null || true
wait "$SOCAT_PID" 2>/dev/null || true
fi
rm -f /tmp/ttyACM9 /tmp/ttyOUT 2>/dev/null || true
}
trap cleanup EXIT
source_ros2() {
source install/setup.bash
}
wait_for_topic() {
local topic="$1"
local timeout="${2:-5}"
local count=0
while ! ros2 topic list 2>/dev/null | grep -q "^${topic}$"; do
sleep 0.5
count=$((count + 1))
if [[ $count -ge $((timeout * 2)) ]]; then
return 1
fi
done
return 0
}
# run a ROS pub/echo test
# usage: ros_pubsub_test <echo_topic> <pub_topic> <msg_type> <msg_data>
# returns the echo output via stdout
ros_pubsub_test() {
local echo_topic="$1"
local pub_topic="$2"
local msg_type="$3"
local msg_data="$4"
timeout 5 bash -c "
ros2 topic echo --once $echo_topic &
ECHO_PID=\$!
sleep 0.5
ros2 topic pub --once $pub_topic $msg_type \"$msg_data\" >/dev/null 2>&1
wait \$ECHO_PID
" 2>/dev/null || true
}
test_mock_connector() {
log "testing mock connector"
log "starting anchor with mock connector"
setsid ros2 run anchor_pkg anchor --ros-args -p connector:=mock &
ANCHOR_PID=$!
sleep 2
if ! kill -0 "$ANCHOR_PID" 2>/dev/null; then
fail "mock connector: anchor failed to start"
return 1
fi
if ! wait_for_topic "/anchor/to_vic/relay" 10; then
fail "mock connector: topics not available"
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
return 1
fi
log "anchor started successfully"
# test: relay -> debug
log "testing relay -> debug"
local output
output=$(ros_pubsub_test "/anchor/to_vic/debug" "/anchor/to_vic/relay" \
"astra_msgs/msg/VicCAN" '{mcu_name: \"core\", command_id: 50, data: [1.0, 2.0, 3.0, 4.0]}')
if [[ -n $output ]] && echo "$output" | grep -q "can_relay_tovic,core,50"; then
pass "mock connector: relay -> debug"
else
fail "mock connector: relay -> debug"
fi
# test: mock_mcu -> from_vic/core
log "testing mock_mcu (core) -> from_vic/core"
output=$(ros_pubsub_test "/anchor/from_vic/core" "/anchor/from_vic/mock_mcu" \
"astra_msgs/msg/VicCAN" '{mcu_name: \"core\", command_id: 10, data: [100.0, 200.0]}')
if [[ -n $output ]] && echo "$output" | grep -q "mcu_name: core" && echo "$output" | grep -q "command_id: 10"; then
pass "mock connector: mock_mcu -> from_vic/core"
else
fail "mock connector: mock_mcu -> from_vic/core"
fi
# test: mock_mcu -> from_vic/arm
log "testing mock_mcu (arm) -> from_vic/arm"
output=$(ros_pubsub_test "/anchor/from_vic/arm" "/anchor/from_vic/mock_mcu" \
"astra_msgs/msg/VicCAN" '{mcu_name: \"arm\", command_id: 55, data: [0.0, 450.0, 900.0, 0.0]}')
if [[ -n $output ]] && echo "$output" | grep -q "mcu_name: arm" && echo "$output" | grep -q "command_id: 55"; then
pass "mock connector: mock_mcu -> from_vic/arm"
else
fail "mock connector: mock_mcu -> from_vic/arm"
fi
# test: mock_mcu -> from_vic/bio
log "testing mock_mcu (citadel) -> from_vic/bio"
output=$(ros_pubsub_test "/anchor/from_vic/bio" "/anchor/from_vic/mock_mcu" \
"astra_msgs/msg/VicCAN" '{mcu_name: \"citadel\", command_id: 20, data: [5.0]}')
if echo "$output" | grep -q "mcu_name: citadel" && echo "$output" | grep -q "command_id: 20"; then
pass "mock connector: mock_mcu -> from_vic/bio"
else
fail "mock connector: mock_mcu -> from_vic/bio"
fi
# test: relay_string -> debug
log "testing relay_string -> debug"
output=$(ros_pubsub_test "/anchor/to_vic/debug" "/anchor/to_vic/relay_string" \
"std_msgs/msg/String" '{data: \"test_raw_string_data\"}')
if [[ -n $output ]] && echo "$output" | grep -q "test_raw_string_data"; then
pass "mock connector: relay_string -> debug"
else
fail "mock connector: relay_string -> debug"
fi
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
wait "$ANCHOR_PID" 2>/dev/null || true
ANCHOR_PID=""
}
test_serial_connector() {
log "testing serial connector"
log "creating virtual serial ports with socat"
socat pty,raw,echo=0,link=/tmp/ttyACM9 pty,raw,echo=0,link=/tmp/ttyOUT 2>/dev/null &
SOCAT_PID=$!
sleep 2
if ! kill -0 "$SOCAT_PID" 2>/dev/null; then
fail "serial connector: failed to create virtual serial ports"
return 1
fi
log "starting anchor with serial connector (override: /tmp/ttyACM9)"
setsid ros2 run anchor_pkg anchor --ros-args -p connector:=serial -p serial_override:=/tmp/ttyACM9 &
ANCHOR_PID=$!
sleep 2
if ! kill -0 "$ANCHOR_PID" 2>/dev/null; then
fail "serial connector: anchor failed to start"
kill -INT "$SOCAT_PID" 2>/dev/null || true
return 1
fi
if ! wait_for_topic "/anchor/to_vic/relay" 10; then
fail "serial connector: topics not available"
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
kill -INT "$SOCAT_PID" 2>/dev/null || true
return 1
fi
pass "serial connector: anchor starts with virtual serial"
# test: relay -> serial output (VicCAN encoding)
log "testing relay -> serial output"
local serial_out_file
serial_out_file=$(mktemp)
# Start head first (blocks waiting for input), then publish
timeout 5 head -n1 /tmp/ttyOUT >"$serial_out_file" &
local head_pid=$!
sleep 0.3
ros2 topic pub --once /anchor/to_vic/relay astra_msgs/msg/VicCAN \
'{mcu_name: "core", command_id: 30, data: [1.0, 2.0, 3.0, 4.0]}' >/dev/null 2>&1
wait $head_pid 2>/dev/null || true
local serial_out
serial_out=$(cat "$serial_out_file")
rm -f "$serial_out_file"
if [[ -n $serial_out ]] && echo "$serial_out" | grep -q "can_relay_tovic,core,30"; then
pass "serial connector: relay -> serial output"
else
fail "serial connector: relay -> serial output (got: $serial_out)"
fi
# test: serial input -> from_vic/core
log "testing serial input -> from_vic/core"
local output
output=$(timeout 5 bash -c '
ros2 topic echo --once /anchor/from_vic/core &
ECHO_PID=$!
sleep 0.5
echo "can_relay_fromvic,core,15,10.0,20.0,30.0,40.0" > /tmp/ttyOUT
sleep 0.5
echo "can_relay_fromvic,core,15,10.0,20.0,30.0,40.0" > /tmp/ttyOUT
wait $ECHO_PID
' 2>/dev/null) || true
if echo "$output" | grep -q "mcu_name: core" && echo "$output" | grep -q "command_id: 15"; then
pass "serial connector: serial input -> from_vic/core"
else
fail "serial connector: serial input -> from_vic/core (got: $output)"
fi
# test: relay_string -> debug
log "testing relay_string -> debug"
output=$(ros_pubsub_test "/anchor/to_vic/debug" "/anchor/to_vic/relay_string" \
"std_msgs/msg/String" '{data: \"serial_test_string\"}')
if [[ -n $output ]] && echo "$output" | grep -q "serial_test_string"; then
pass "serial connector: relay_string -> debug"
else
fail "serial connector: relay_string -> debug"
fi
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
wait "$ANCHOR_PID" 2>/dev/null || true
ANCHOR_PID=""
kill -INT "$SOCAT_PID" 2>/dev/null || true
wait "$SOCAT_PID" 2>/dev/null || true
SOCAT_PID=""
}
test_can_connector() {
log "testing CAN connector"
log "starting anchor with CAN connector (override: vcan0)"
setsid ros2 run anchor_pkg anchor --ros-args -p connector:=can -p can_override:=vcan0 &
ANCHOR_PID=$!
sleep 2
if ! kill -0 "$ANCHOR_PID" 2>/dev/null; then
fail "CAN connector: anchor failed to start"
return 1
fi
if ! wait_for_topic "/anchor/to_vic/relay" 10; then
fail "CAN connector: topics not available"
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
return 1
fi
log "anchor started successfully"
sleep 1
# test: relay -> CAN bus
# core=1, int16x4=2, cmd=30 -> id = (1<<8)|(2<<6)|30 = 0x19E
log "testing relay -> CAN bus"
local output
output=$(timeout 8 bash -c '
candump -n 1 vcan0 &
DUMP_PID=$!
sleep 1
ros2 topic pub --once /anchor/to_vic/relay astra_msgs/msg/VicCAN "{mcu_name: \"core\", command_id: 30, data: [1, 2, 3, 4]}" >/dev/null 2>&1
sleep 0.5
ros2 topic pub --once /anchor/to_vic/relay astra_msgs/msg/VicCAN "{mcu_name: \"core\", command_id: 30, data: [1, 2, 3, 4]}" >/dev/null 2>&1
wait $DUMP_PID
' 2>/dev/null) || true
if echo "$output" | grep -qi "19E"; then
pass "CAN connector: relay -> CAN bus"
else
fail "CAN connector: relay -> CAN bus (got: $output)"
fi
# test: CAN -> from_vic/core
log "testing CAN bus -> from_vic/core"
output=$(timeout 5 bash -c '
ros2 topic echo --once /anchor/from_vic/core &
ECHO_PID=$!
sleep 1
cansend vcan0 18F#000A0014001E0028
sleep 0.5
cansend vcan0 18F#000A0014001E0028
wait $ECHO_PID
' 2>/dev/null) || true
if echo "$output" | grep -q "mcu_name: core" && echo "$output" | grep -q "command_id: 15"; then
pass "CAN connector: CAN -> from_vic/core"
else
fail "CAN connector: CAN -> from_vic/core"
fi
# test: CAN -> from_vic/arm
log "testing CAN bus -> from_vic/arm"
output=$(timeout 5 bash -c '
ros2 topic echo --once /anchor/from_vic/arm &
ECHO_PID=$!
sleep 1
cansend vcan0 294#00640096012C01F4
sleep 0.5
cansend vcan0 294#00640096012C01F4
wait $ECHO_PID
' 2>/dev/null) || true
if echo "$output" | grep -q "mcu_name: arm" && echo "$output" | grep -q "command_id: 20"; then
pass "CAN connector: CAN -> from_vic/arm"
else
fail "CAN connector: CAN -> from_vic/arm"
fi
# test: CAN double data type (data_type_key=0)
log "testing CAN double data type"
output=$(timeout 8 bash -c '
ros2 topic echo --once /anchor/from_vic/core &
ECHO_PID=$!
sleep 1
cansend vcan0 105#3FF0000000000000
sleep 0.5
cansend vcan0 105#3FF0000000000000
sleep 0.5
cansend vcan0 105#3FF0000000000000
wait $ECHO_PID
' 2>/dev/null) || true
if echo "$output" | grep -q "mcu_name: core" && echo "$output" | grep -q "command_id: 5"; then
pass "CAN connector: double data type"
else
fail "CAN connector: double data type"
fi
# test: CAN float32x2 data type (data_type_key=1)
log "testing CAN float32x2 data type"
output=$(timeout 8 bash -c '
ros2 topic echo --once /anchor/from_vic/core &
ECHO_PID=$!
sleep 1
cansend vcan0 14A#3F80000040000000
sleep 0.5
cansend vcan0 14A#3F80000040000000
sleep 0.5
cansend vcan0 14A#3F80000040000000
wait $ECHO_PID
' 2>/dev/null) || true
if echo "$output" | grep -q "mcu_name: core" && echo "$output" | grep -q "command_id: 10"; then
pass "CAN connector: float32x2 data type"
else
fail "CAN connector: float32x2 data type"
fi
kill -INT -- -"$ANCHOR_PID" 2>/dev/null || true
wait "$ANCHOR_PID" 2>/dev/null || true
ANCHOR_PID=""
}
check_prerequisites() {
log "checking prerequisites"
local missing=0
if [[ ! -f install/setup.bash ]]; then
fail "install/setup.bash not found; run 'colcon build --symlink-install' first"
missing=1
fi
if ! command -v socat &>/dev/null; then
fail "socat not found; install it or use 'nix develop'"
missing=1
fi
if ! command -v cansend &>/dev/null || ! command -v candump &>/dev/null; then
fail "can-utils (cansend/candump) not found; install it or use 'nix develop'"
missing=1
fi
if ! ip link show vcan0 &>/dev/null; then
fail "vcan0 interface not found"
log " create it with:"
log " sudo ip link add dev vcan0 type vcan"
log " sudo ip link set vcan0 up"
missing=1
elif ! ip link show vcan0 | grep -q ",UP"; then
fail "vcan0 exists but is not UP"
log " enable it with: sudo ip link set vcan0 up"
missing=1
fi
if [[ $missing -eq 1 ]]; then
echo ""
log "prerequisites not met"
exit 1
fi
log "all prerequisites met"
}
main() {
echo ""
log "anchor connector test suite"
echo ""
check_prerequisites
log "sourcing ROS2 workspace"
source_ros2
test_mock_connector
test_serial_connector
test_can_connector
echo ""
log "test summary"
echo -e "${BOLD}${GREEN}passed:${NC} $TESTS_PASSED"
echo -e "${BOLD}${RED}failed:${NC} $TESTS_FAILED"
echo ""
if [[ $TESTS_FAILED -gt 0 ]]; then
exit 1
fi
exit 0
}
main "$@"

View File

@@ -1,11 +1,9 @@
from warnings import deprecated
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy.executors import ExternalShutdownException, SingleThreadedExecutor
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import atexit
from .connector import (
Connector,
MockConnector,
@@ -14,8 +12,7 @@ from .connector import (
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import threading
from .convert import string_to_viccan, viccan_to_string
from astra_msgs.msg import VicCAN
from std_msgs.msg import String
@@ -37,7 +34,7 @@ class Anchor(Node):
Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish ViCAN messages here as if they came from an MCU
- For testing without an actual MCU, publish VicCAN messages here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
@@ -154,7 +151,7 @@ class Anchor(Node):
)
# Debug publisher
self.tovic_debug_pub_ = self.create_publisher(
VicCAN,
String,
"/anchor/to_vic/debug",
20,
)
@@ -173,7 +170,7 @@ class Anchor(Node):
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String,
VicCAN,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
@@ -181,15 +178,17 @@ class Anchor(Node):
self.tovic_string_sub_ = self.create_subscription(
String,
"/anchor/to_vic/relay_string",
self.connector.write_raw,
self.write_connector_raw,
20,
)
# Close devices on exit
atexit.register(self.cleanup)
# poll at 100Hz for incoming data
self.read_timer_ = self.create_timer(0.01, self.read_connector)
def cleanup(self):
def destroy_node(self):
self.get_logger().info("closing connector")
self.connector.cleanup()
super().destroy_node()
def read_connector(self):
"""Check the connector for new data from the MCU, and publish string to appropriate topics"""
@@ -204,6 +203,11 @@ class Anchor(Node):
def write_connector(self, msg: VicCAN):
"""Write to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write(msg)
self.tovic_debug_pub_.publish(String(data=viccan_to_string(msg)))
def write_connector_raw(self, msg: String):
"""Write raw string to the connector and send a copy to /anchor/to_vic/debug"""
self.connector.write_raw(msg)
self.tovic_debug_pub_.publish(msg)
@deprecated(
@@ -226,25 +230,26 @@ class Anchor(Node):
"""Relay a message from the MCU to the appropriate VicCAN topic"""
if msg.mcu_name == "core":
self.fromvic_core_pub_.publish(msg)
elif msg.mcu_name == "arm" or msg.mcu_name == "digit":
if msg.mcu_name == "arm" or msg.mcu_name == "digit":
self.fromvic_arm_pub_.publish(msg)
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
if msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
def main(args=None):
try:
rclpy.init(args=args)
anchor_node = Anchor()
executor = SingleThreadedExecutor()
executor.add_node(anchor_node)
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True)
thread.start()
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
while rclpy.ok():
anchor_node.read_connector() # Check the connector for updates
rate.sleep()
try:
executor.spin()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
pass
finally:
# don't accept any more jobs
executor.shutdown()
# make the node quit processing things
anchor_node.destroy_node()
# shut down everything else
rclpy.try_shutdown()

View File

@@ -1,5 +1,8 @@
from abc import ABC, abstractmethod
from time import monotonic
from typing import TYPE_CHECKING
from astra_msgs.msg import VicCAN
from std_msgs.msg import String
from rclpy.clock import Clock
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan as _string_to_viccan, viccan_to_string
@@ -19,8 +22,11 @@ KNOWN_USBS = [
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
BAUD_RATE = 115200
SERIAL_READ_TIMEOUT = 0.5 # seconds
MCU_IDS = [
"broadcast",
"core",
@@ -77,7 +83,7 @@ class Connector(ABC):
pass
@abstractmethod
def write_raw(self, msg: str):
def write_raw(self, msg: String):
pass
def cleanup(self):
@@ -95,6 +101,10 @@ class SerialConnector(Connector):
ports = self._find_ports()
mcu_name: str | None = None
# Serial buffering
self._serial_buffer: bytes = b""
self._last_read_time = monotonic()
if serial_override:
logger.warn(
f"using serial_override: `{serial_override}`! this will bypass several checks."
@@ -122,7 +132,7 @@ class SerialConnector(Connector):
self.mcu_name = mcu_name
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=0)
def _find_ports(self) -> list[str]:
"""
@@ -181,9 +191,66 @@ class SerialConnector(Connector):
return None
def _try_readline(self) -> str | None:
"""Attempts to read a full string from the MCU without blocking.
When pyserial is used with 'timeout=0', reads are performed non-blocking.
When used with interface.readline(), this breaks the assumption that a returned
string will be a completed attempt by the MCU to send a string; it may be
cut off between the start of the string and the newline, removing information
and rendering the string(s) useless; thus, to get around the downside of readline()
not waiting for a newline while still not blocking, this function manually
implements a serial input buffer and newline timeout.
If readline() returns a non-empty string, send it if it ends with a newline
(readline() will not read past any newline); otherwise, save the read string.
This buffered string should be pre-pended to the next readline() result.
If readline() does not receive a non-empty string after the last non-newline-
terminated readline() result within the manual timeout, send the contents of the
buffer as if it ended with a newline, and clear the buffer.
Returns:
str: A hopefully-complete string read from the MCU via the serial interface.
"""
if TYPE_CHECKING:
assert type(self.serial_interface) == serial.Serial
# Warn on buffer timeout, as the only scenarios that would trigger this are
# a microcontroller output that isn't newline-terminated (bad), or the MCU is
# hanging (also bad).
if (
self._serial_buffer
and (monotonic() - self._last_read_time) > SERIAL_READ_TIMEOUT
):
self.logger.warn(
f"Serial buffer timeout, last received '{self._serial_buffer}'."
)
result = self._serial_buffer
self._serial_buffer = b""
self._last_read_time = monotonic()
return str(result, "utf8").strip()
# No try-except here so caller catches it instead.
raw = self.serial_interface.readline()
# Empty or whitespace-only string
if not raw or not raw.strip():
return None
# Add to buffer or send finished buffer
if not (raw.endswith(b"\n") or raw.endswith(b"\r")): # unfinished string
self._serial_buffer += raw
self._last_read_time = monotonic()
return None
else:
result = self._serial_buffer + raw
self._serial_buffer = b""
return str(result, "utf8").strip()
def read(self) -> tuple[VicCAN | None, str | None]:
try:
raw = str(self.serial_interface.readline(), "utf8")
raw = self._try_readline()
if not raw:
return (None, None)
@@ -199,10 +266,10 @@ class SerialConnector(Connector):
return (None, None) # pretty much no other error matters
def write(self, msg: VicCAN):
self.write_raw(viccan_to_string(msg))
self.write_raw(String(data=viccan_to_string(msg)))
def write_raw(self, msg: str):
self.serial_interface.write(bytes(msg, "utf8"))
def write_raw(self, msg: String):
self.serial_interface.write(bytes(msg.data, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
@@ -411,8 +478,10 @@ class CANConnector(Connector):
self.logger.error(f"CAN error while sending: {e}")
raise DeviceClosedException("CAN bus closed unexpectedly")
def write_raw(self, msg: str):
self.logger.warn(f"write_raw is not supported for CANConnector. msg: {msg}")
def write_raw(self, msg: String):
self.logger.warn(
f"write_raw is not supported for CANConnector. msg: {msg.data}"
)
def cleanup(self):
try:
@@ -434,5 +503,5 @@ class MockConnector(Connector):
def write(self, msg: VicCAN):
pass
def write_raw(self, msg: str):
def write_raw(self, msg: String):
pass

View File

@@ -1,10 +1,8 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, Shutdown, IncludeLaunchDescription
from launch.actions import DeclareLaunchArgument, Shutdown
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.launch_description_sources import PythonLaunchDescriptionSource
def generate_launch_description():
@@ -48,24 +46,50 @@ def generate_launch_description():
)
)
ld.add_action(
DeclareLaunchArgument(
"use_ros2_control",
default_value="false",
description="Whether to use DiffDriveController for driving instead of direct Twist",
)
)
ld.add_action(
DeclareLaunchArgument(
"rover_platform",
default_value="auto",
description="Choose the rover platform (either clucky or testbed). If left on auto, will defer to ROVER_PLATFORM environment variable.",
choices=["clucky", "testbed", "auto"],
)
)
# nodes
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="anchor_pkg",
@@ -84,91 +108,4 @@ def generate_launch_description():
)
)
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[
{"launch_mode": "anchor"},
{
"use_ros2_control": LaunchConfiguration(
"use_ros2_control", default=False
)
},
{
"rover_platform": LaunchConfiguration(
"rover_platform", default="auto"
)
},
],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
return ld

View File

@@ -3,20 +3,14 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>ASTRA VicCAN driver package, using python-can and pyserial.</description>
<description>Anchor -- ROS and CAN relay node</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-serial</exec_depend>
<exec_depend>python3-can</exec_depend>
<exec_depend>core_pkg</exec_depend>
<exec_depend>arm_pkg</exec_depend>
<exec_depend>bio_pkg</exec_depend>
<exec_depend>core_description</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/anchor_pkg
[install]
install_scripts=$base/lib/anchor_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -6,7 +6,7 @@ package_name = "anchor_pkg"
setup(
name=package_name,
version="1.0.0",
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -17,8 +17,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="ASTRA VicCAN driver package, using python-can and pyserial.",
license="AGPL-3.0-only",
description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN",
license="All Rights Reserved",
entry_points={
"console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
},

View File

@@ -1,4 +1,5 @@
import sys
import threading
import signal
import math
from warnings import deprecated
@@ -12,7 +13,7 @@ from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -25,6 +26,8 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None
class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
@@ -78,9 +81,7 @@ class ArmNode(Node):
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(
String, "/anchor/to_vic/relay_string", 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Create publishers
self.socket_pub = self.create_publisher(
@@ -114,10 +115,10 @@ class ArmNode(Node):
# Control
# Manual: /arm/control/joint_jog is published by Basestation or Headless
# Manual: /arm/manual/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/control/joint_jog",
"/arm/manual/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
@@ -128,13 +129,6 @@ class ArmNode(Node):
self.joint_command_callback,
qos_profile=control_qos,
)
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
)
# Feedback
@@ -154,14 +148,9 @@ class ArmNode(Node):
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
self.arm_feedback_new.axis0_motor.id = 1
self.arm_feedback_new.axis1_motor.id = 2
self.arm_feedback_new.axis2_motor.id = 3
self.arm_feedback_new.axis3_motor.id = 4
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.header.frame_id = "base_link"
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
@@ -184,54 +173,10 @@ class ArmNode(Node):
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=39,
data=velocities[0:4],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=39,
data=velocities[4:6],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=26,
data=[velocities[6]],
header=msg.header,
)
)
self.send_velocities(velocities, msg.header)
# TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
@@ -257,12 +202,12 @@ class ArmNode(Node):
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:4], header=header)
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6], header=header)
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
@@ -345,8 +290,6 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
@@ -433,14 +376,11 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.digit_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]

View File

@@ -3,15 +3,14 @@
<package format="3">
<name>arm_pkg</name>
<version>1.0.0</version>
<description>Relays topics related to Arm between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<description>Core arm package which handles ROS2 commnuication.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-numpy</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -12,9 +12,9 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="David Sharpe",
maintainer="David",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to Arm between VicCAN (through Anchor) and basestation.",
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": ["arm = arm_pkg.arm_node:main"],

View File

@@ -2,16 +2,14 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>bio_pkg</name>
<version>1.0.0</version>
<description>Biosensor package to handle command interpretation and embedded interfacing.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/bio_pkg
[install]
install_scripts=$base/lib/bio_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,8 +14,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Relays topics related to Biosensor between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
description="TODO: Package description",
license="TODO: License declaration",
entry_points={
"console_scripts": ["bio = bio_pkg.bio_node:main"],
},

View File

@@ -0,0 +1,112 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
import importlib
from std_msgs.msg import String
from astra_msgs.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
max_speed = 90 # Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, "/core/control", 10)
self.lastMsg = (
String()
) # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}"
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -1,16 +1,20 @@
import sys
import signal
from typing import Literal, cast
from scipy.spatial.transform import Rotation
from math import copysign, pi
from warnings import deprecated
from os import getenv
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import os
import sys
import threading
import glob
from scipy.spatial.transform import Rotation
from math import copysign, pi
from std_msgs.msg import String, Header
from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState
@@ -19,14 +23,12 @@ from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState
from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState
serial_pub = None
thread = None
CORE_WHEELBASE = 0.836 # meters
CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1
# TODO: update core_description or add testbed_description
TESTBED_WHEELBASE = 0.368 # meters
TESTBED_WHEEL_RADIUS = 0.108 # meters
TESTBED_GEAR_RATIO = 64 # Testbed: 64:1
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -39,10 +41,6 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
class CoreNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Core related topics."""
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
@@ -56,70 +54,23 @@ class CoreNode(Node):
56: 4, # really 3, but viccan
58: 4, # ditto
}
rover_platform: Literal["core", "testbed"]
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("core_node")
self.get_logger().info(f"core launch_mode is: anchor")
# Launch mode -- anchor vs core
self.declare_parameter("launch_mode", "core")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
##################################################
# Parameters
self.declare_parameter("use_ros2_control", False)
self.use_ros2_control = (
self.get_parameter("use_ros2_control").get_parameter_value().bool_value
)
self.declare_parameter("rover_platform", "auto")
rover_platform = (
self.get_parameter("rover_platform").get_parameter_value().string_value
)
if rover_platform == "auto":
self.get_logger().info(
"rover_platform parameter is unset, falling back to environment variable"
)
rover_platform = getenv("ROVER_PLATFORM", "auto")
if rover_platform not in ("core", "testbed"): # make sure we have a valid value
raise ValueError("rover platform must be either 'core' or 'testbed'.")
else:
self.rover_platform = cast(Literal["core", "testbed"], rover_platform)
if self.rover_platform == "testbed":
global TESTBED_WHEELBASE, TESTBED_WHEEL_RADIUS, TESTBED_GEAR_RATIO
self.wheelbase = TESTBED_WHEELBASE
self.wheel_radius = TESTBED_WHEEL_RADIUS
self.gear_ratio = TESTBED_GEAR_RATIO
else: # default in case of unset or invalid environment variable
global CORE_WHEELBASE, CORE_WHEEL_RADIUS, CORE_GEAR_RATIO
self.wheelbase = CORE_WHEELBASE
self.wheel_radius = CORE_WHEEL_RADIUS
self.gear_ratio = CORE_GEAR_RATIO
##################################################
# Old Topics
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
if not self.use_ros2_control:
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
##################################################
# New Topics
# Topics
# Anchor
if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20
)
@@ -127,16 +78,18 @@ class CoreNode(Node):
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
if self.use_ros2_control:
# Joint state control for topic-based controller
self.joint_command_sub_ = self.create_subscription(
JointState, "/core/joint_commands", self.joint_command_callback, 2
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
self.cmd_vel_sub_ = self.create_subscription(
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1
)
else:
# manual twist -- [-1, 1] rather than real units
# TODO: change topic to '/core/control/twist'
self.twist_man_sub_ = self.create_subscription(
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
)
@@ -147,64 +100,148 @@ class CoreNode(Node):
self.control_state_callback,
qos_profile=control_qos,
)
self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
self.twist_max_duty = (
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
)
# Feedback
# Consolidated and organized main core feedback
# TODO: change topic to something like '/core/feedback/main'
# Consolidated and organized core feedback
self.feedback_new_pub_ = self.create_publisher(
NewCoreFeedback,
"/core/feedback_new",
qos_profile=qos.qos_profile_sensor_data,
)
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/feedback/baro", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Main Core feedback
self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4
# IMU
self.telemetry_pub_timer = self.create_timer(
1.0, self.publish_feedback
) # TODO: not sure about this
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055"
# GPS
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data
)
self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388"
@deprecated("Uses an old message type. Will be removed at some point.")
# Old
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
# Debug
self.debug_pub = self.create_publisher(String, "/core/debug", 10)
self.ping_service = self.create_service(
Empty, "/astra/core/ping", self.ping_callback
)
##################################################
# Find microcontroller (Non-anchor only)
# Core (non-anchor) specific
if self.launch_mode == "core":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
self.get_logger().info(f"Enabling Relay Mode")
ser.write(b"can_relay_mode,on\n")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
# end __init__()
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
if self.launch_mode == "core":
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): # NON-ANCHOR SPECIFIC
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
print(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
def scale_duty(self, value: float, max_speed: float):
leftMin = -1
leftMax = 1
@@ -221,7 +258,6 @@ class CoreNode(Node):
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
@deprecated("Uses an old message type. Will be removed at some point.")
def send_controls(self, msg: CoreControl):
if msg.turn_to_enable:
command = (
@@ -247,42 +283,17 @@ class CoreNode(Node):
# print(f"[Sys] Relaying: {command}")
def joint_command_callback(self, msg: JointState):
# So... topic based control node publishes JointState messages over /joint_commands
# with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints
# are included in msg.name, but ig it is implied that msg.velocity only
# includes velocities for the commanded joints (ros__parameters.joints).
# So, this will be much more hacky and less adaptable than I would like it to be.
if (
len(msg.name) != (4 if self.rover_platform == "testbed" else 5)
or len(msg.velocity) != 4
or len(msg.position) != 0
):
self.get_logger().warning(
f"Received joint control message with unexpected number of joints. Ignoring."
)
return
if msg.name[-4:] != [ # type: ignore
"bl_wheel_joint",
"br_wheel_joint",
"fl_wheel_joint",
"fr_wheel_joint",
]:
self.get_logger().warning(
f"Received joint control message with unexpected name[]. Ignoring."
)
return
def cmd_vel_callback(self, msg: TwistStamped):
linear = msg.twist.linear.x
angular = -msg.twist.angular.z
(bl_vel, br_vel, fl_vel, fr_vel) = msg.velocity
vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
bl_rpm = radps_to_rpm(bl_vel) * self.gear_ratio
br_rpm = radps_to_rpm(br_vel) * self.gear_ratio
fl_rpm = radps_to_rpm(fl_vel) * self.gear_ratio
fr_rpm = radps_to_rpm(fr_vel) * self.gear_ratio
vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
self.send_viccan(
20, [fl_rpm, bl_rpm, fr_rpm, br_rpm]
) # order expected by embedded
self.send_viccan(20, [vel_left_rpm, vel_right_rpm])
def twist_man_callback(self, msg: Twist):
linear = msg.linear.x # [-1 1] for forward/back from left stick y
@@ -323,9 +334,15 @@ class CoreNode(Node):
# Max duty cycle
self.twist_max_duty = msg.max_duty # twist_man_callback will handle this
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str):
self.anchor_pub.publish(String(data=msg)) # Publish to anchor for relay
if self.launch_mode == "anchor":
# self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String() # Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "core":
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def send_viccan(self, cmd_id: int, data: list[float]):
self.anchor_tovic_pub_.publish(
@@ -337,7 +354,6 @@ class CoreNode(Node):
)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
@@ -407,16 +423,14 @@ class CoreNode(Node):
# skill diff if not
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_msg_len_dict:
expected_len = self.viccan_msg_len_dict[msg.command_id]
if msg.command_id in viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
self.feedback_new_state.header.stamp = msg.header.stamp
match msg.command_id:
# GNSS
case 48: # GNSS Latitude
@@ -443,7 +457,6 @@ class CoreNode(Node):
self.imu_state.linear_acceleration.x = float(msg.data[0])
self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2])
self.feedback_new_state.orientation = float(msg.data[3])
# Deal with quaternion
r = Rotation.from_euler("z", float(msg.data[3]), degrees=True)
q = r.as_quat()
@@ -488,7 +501,6 @@ class CoreNode(Node):
self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0
self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0
self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0
self.feedback_new_state.board_voltage.header.stamp = msg.header.stamp
# Baro
case 56: # BMP temperature, altitude, pressure
self.baro_state.temperature = float(msg.data[0])
@@ -501,12 +513,14 @@ class CoreNode(Node):
motorId = round(float(msg.data[0]))
position = float(msg.data[1])
velocity = float(msg.data[2])
joint_state_msg = JointState()
joint_state_msg = (
JointState()
) # TODO: not sure if all motors should be in each message or not
joint_state_msg.position = [
position * (2 * pi) / self.gear_ratio
position * (2 * pi) / CORE_GEAR_RATIO
] # revolutions to radians
joint_state_msg.velocity = [
velocity * (2 * pi / 60.0) / self.gear_ratio
velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO
] # RPM to rad/s
motor: RevMotorState | None = None
@@ -514,42 +528,52 @@ class CoreNode(Node):
match motorId:
case 1:
motor = self.feedback_new_state.fl_motor
joint_state_msg.name = ["fl_wheel_joint"]
joint_state_msg.name = ["fl_motor_joint"]
case 2:
motor = self.feedback_new_state.bl_motor
joint_state_msg.name = ["bl_wheel_joint"]
joint_state_msg.name = ["bl_motor_joint"]
case 3:
motor = self.feedback_new_state.fr_motor
joint_state_msg.name = ["fr_wheel_joint"]
joint_state_msg.name = ["fr_motor_joint"]
case 4:
motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_wheel_joint"]
joint_state_msg.name = ["br_motor_joint"]
case _:
self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
)
return
if motor:
motor.position = position
motor.velocity = velocity
# make the fucking shit work
if self.rover_platform == "clucky":
joint_state_msg.name.append("left_suspension_joint")
joint_state_msg.position.append(0.0)
joint_state_msg.velocity.append(0.0)
joint_state_msg.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(joint_state_msg)
case _:
return
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self):
# self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback)
def ping_callback(self, request, response):
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def map_range(
value: float, in_min: float, in_max: float, out_min: float, out_max: float
@@ -557,31 +581,19 @@ def map_range(
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def radps_to_rpm(radps: float):
return radps * 60 / (2 * pi)
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
global serial_pub
core_node = CoreNode()
try:
rclpy.spin(core_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -3,17 +3,15 @@
<package format="3">
<name>core_pkg</name>
<version>1.0.0</version>
<description>Relays topics related to Core between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<description>Core rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<exec_depend>python-crccheck-pip</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-scipy</depend>
<depend>python-crccheck-pip</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/core_pkg
[install]
install_scripts=$base/lib/core_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -4,7 +4,7 @@ package_name = "core_pkg"
setup(
name=package_name,
version="1.0.0",
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -12,13 +12,14 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to Core between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Core rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
entry_points={
"console_scripts": [
"core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main",
],
},

View File

@@ -3,15 +3,14 @@
<package format="3">
<name>headless_pkg</name>
<version>1.0.0</version>
<description>Provides headless rover control, similar to Basestation.</description>
<description>Headless rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-pygame</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-pygame</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/headless_pkg
[install]
install_scripts=$base/lib/headless_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,9 +14,11 @@ setup(
zip_safe=True,
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Provides headless rover control, similar to Basestation.",
license="AGPL-3.0-only",
description="Headless rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
entry_points={
"console_scripts": ["headless_full = src.headless_node:main"],
"console_scripts": [
"headless_full = src.headless_node:main",
],
},
)

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState, ArmCtrlState
from astra_msgs.msg import CoreCtrlState
import warnings
@@ -138,11 +138,6 @@ class Headless(Node):
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
self.declare_parameter("use_cmd_vel", False)
self.use_cmd_vel = (
self.get_parameter("use_cmd_vel").get_parameter_value().bool_value
)
self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
@@ -150,6 +145,7 @@ class Headless(Node):
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
)
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
@@ -159,13 +155,6 @@ class Headless(Node):
)
# Check parameter validity
if self.use_cmd_vel:
self.get_logger().info("Using cmd_vel for core control")
global CORE_MODE
CORE_MODE = "twist"
else:
self.get_logger().info("Using astra_msgs/CoreControl for core control")
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
@@ -177,8 +166,6 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
##################################################
# Old Topics
@@ -197,18 +184,12 @@ class Headless(Node):
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_cmd_vel_pub_ = self.create_publisher(
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
@@ -250,17 +231,11 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150)
# Added so you can tell when it starts running after changing the constant logging to debug from info
self.get_logger().info("Defaulting to Core mode. Ready.")
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
if self.use_cmd_vel:
self.core_cmd_vel_pub_.publish(self.core_cmd_vel_stop_msg())
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik:
@@ -357,28 +332,18 @@ class Headless(Node):
self.core_publisher.publish(input)
else: # New topics
twist = Twist()
input = Twist()
# Forward/back and Turn
twist.linear.x = -1.0 * left_stick_y
twist.angular.z = -1.0 * copysign(
input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x
) # Exponent for finer control (curve)
# This kinda looks dumb being seperate from the following block, but this
# maintains the separation between modifying the control message and sending it
if self.use_cmd_vel:
twist.linear.x *= 1.5
twist.angular.z *= 0.5
# Publish
if self.use_cmd_vel:
header = Header(stamp=self.get_clock().now().to_msg())
self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist))
else:
self.core_twist_pub_.publish(twist)
self.get_logger().debug(
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
self.core_twist_pub_.publish(input)
self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
)
# Brake mode
@@ -587,26 +552,13 @@ class Headless(Node):
)
# A: brake
new_brake_mode = button_a
# TODO: Brake mode
# X: laser
new_laser = button_x
# Y: linear actuator
# TODO: linear actuator
self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW)
# =============
@@ -691,11 +643,6 @@ class Headless(Node):
else:
pass # TODO: implement new bio control topics
def core_cmd_vel_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),

View File

@@ -1,8 +1,9 @@
{ pkgs, ... }:
{ ... }:
{
projectRootFile = "flake.nix";
programs = {
nixfmt.enable = true;
black.enable = true;
shfmt.enable = true;
};
}