11 Commits

Author SHA1 Message Date
SHC-ASTRA
7e6a53b7f1 fixed astra_msgs nixpkg being used incorrectly 2026-02-09 21:31:35 -06:00
SHC-ASTRA
9510270272 Merge branch 'main' into bio-topic-refactor-rebase 2026-02-09 21:17:38 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
iggy
d4042f8744 clean up 2026-02-07 19:50:51 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
SHC-ASTRA
d46bb81ed1 Merge branch 'main' into bio-topic-refactor-rebase 2026-02-05 21:34:51 -06:00
SHC-ASTRA
87a764e616 change astra_msgs to latest main hash 2026-02-05 21:07:56 -06:00
iggy
f99b9e6f96 created bio node, added rqt-graph to flake 2026-02-05 02:12:21 -06:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
23 changed files with 523 additions and 740 deletions

3
.gitmodules vendored
View File

@@ -1,6 +1,3 @@
[submodule "src/astra_description"] [submodule "src/astra_description"]
path = src/astra_descriptions path = src/astra_descriptions
url = ../astra_descriptions url = ../astra_descriptions
[submodule "src/astra_msgs"]
path = src/astra_msgs
url = git@github.com:SHC-ASTRA/astra_msgs.git

View File

@@ -16,9 +16,6 @@ You will use these packages to launch all rover-side ROS2 nodes.
- [Connecting the GuliKit Controller](#connecting-the-gulikit-controller) - [Connecting the GuliKit Controller](#connecting-the-gulikit-controller)
- [Common Problems/Toubleshooting](#common-problemstroubleshooting) - [Common Problems/Toubleshooting](#common-problemstroubleshooting)
- [Packages](#packages) - [Packages](#packages)
- [Graphs](#graphs)
- [Full System](#full-system)
- [Individual Nodes](#individual-nodes)
- [Maintainers](#maintainers) - [Maintainers](#maintainers)
## Software Prerequisites ## Software Prerequisites
@@ -143,40 +140,6 @@ A: To find a microcontroller to talk to, Anchor sends a ping to every Serial por
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`). - [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK. - [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Graphs
### Full System
> **Anchor stand-alone** (`ros2 launch anchor_pkg rover.launch.py`)
>
> ![rqt_graph of Anchor by itself, ran with command: ros2 launch anchor_pkg rover.launch.py](./docs-resources/graph-anchor-standalone.png)
> **Anchor with [basestation-classic](https://github.com/SHC-ASTRA/basestation-classic)**
>
> ![rqt_graph of Anchor ran with the same command as above, talking to basestation-classic](./docs-resources/graph-anchor-w-basestation-classic.png)
> **Anchor with Headless** (`ros2 run headless_pkg headless_full`)
>
> ![rqt_graph of Anchor ran with Headless](./docs-resources/graph-anchor-w-headless.png)
### Individual Nodes
> **Anchor** (`ros2 run anchor_pkg anchor`)
>
> ![rqt_graph of Anchor node running by itself](./docs-resources/graph-anchor-anchor-standalone.png)
> **Core** (`ros2 run core_pkg core --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Core node running by itself](./docs-resources/graph-anchor-core-standalone.png)
> **Arm** (`ros2 run arm_pkg arm --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Arm node running by itself](./docs-resources/graph-anchor-arm-standalone.png)
> **Bio** (`ros2 run bio_pkg bio --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Bio node running by itself](./docs-resources/graph-anchor-bio-standalone.png)
## Maintainers ## Maintainers
| Name | Email | Discord | | Name | Email | Discord |

View File

@@ -14,12 +14,8 @@ echo "[INFO] Network interface is up!"
# Your actual ROS node start command goes here # Your actual ROS node start command goes here
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script (if we aren't using nix)
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 543 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 439 KiB

96
flake.lock generated
View File

@@ -1,5 +1,29 @@
{ {
"nodes": { "nodes": {
"astra-msgs": {
"inputs": {
"nix-ros-overlay": "nix-ros-overlay",
"nixpkgs": [
"astra-msgs",
"nix-ros-overlay",
"nixpkgs"
]
},
"locked": {
"lastModified": 1770693861,
"narHash": "sha256-8NBtSsKVYtd+NYt51tJ1EteC0nOTemDUBoRXbDoQAuA=",
"owner": "SHC-ASTRA",
"repo": "astra_msgs",
"rev": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"type": "github"
},
"original": {
"owner": "SHC-ASTRA",
"ref": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"repo": "astra_msgs",
"type": "github"
}
},
"flake-utils": { "flake-utils": {
"inputs": { "inputs": {
"systems": "systems" "systems": "systems"
@@ -18,11 +42,49 @@
"type": "github" "type": "github"
} }
}, },
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-ros-overlay": { "nix-ros-overlay": {
"inputs": { "inputs": {
"flake-utils": "flake-utils", "flake-utils": "flake-utils",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": {
"lastModified": 1770408708,
"narHash": "sha256-E7ZQRgGrsiuswgXnG7337ZR5s4SdZlheZjxKOQdVoH8=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "e78ba91032c7f8bdd823fbf43937cbf0f4f09747",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nix-ros-overlay_2": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs_2"
},
"locked": { "locked": {
"lastModified": 1770108954, "lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=", "narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
@@ -54,9 +116,26 @@
"type": "github" "type": "github"
} }
}, },
"nixpkgs_2": {
"locked": {
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "nix-ros",
"repo": "nixpkgs",
"type": "github"
}
},
"root": { "root": {
"inputs": { "inputs": {
"nix-ros-overlay": "nix-ros-overlay", "astra-msgs": "astra-msgs",
"nix-ros-overlay": "nix-ros-overlay_2",
"nixpkgs": [ "nixpkgs": [
"nix-ros-overlay", "nix-ros-overlay",
"nixpkgs" "nixpkgs"
@@ -77,6 +156,21 @@
"repo": "default", "repo": "default",
"type": "github" "type": "github"
} }
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
} }
}, },
"root": "root", "root": "root",

View File

@@ -4,6 +4,8 @@
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
# specify astra_msgs commit hash to the one we support
astra-msgs.url = "github:SHC-ASTRA/astra_msgs?ref=60bbb53085b09fbdb7e848b1dd168d526d9af281";
}; };
outputs = outputs =
@@ -11,6 +13,7 @@
self, self,
nix-ros-overlay, nix-ros-overlay,
nixpkgs, nixpkgs,
astra-msgs,
}: }:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem ( nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system: system:
@@ -24,8 +27,9 @@
devShells.default = pkgs.mkShell { devShells.default = pkgs.mkShell {
name = "ASTRA Anchor"; name = "ASTRA Anchor";
packages = with pkgs; [ packages = with pkgs; [
astra-msgs.packages.${system}.astra-msgs
colcon colcon
(python312.withPackages ( (python313.withPackages (
p: with p; [ p: with p; [
pyserial pyserial
pygame pygame
@@ -39,6 +43,7 @@
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
rqt-graph
ros2cli ros2cli
ros2run ros2run
ros2bag ros2bag

View File

@@ -8,7 +8,6 @@ from launch.substitutions import (
PathJoinSubstitution, PathJoinSubstitution,
) )
from launch_ros.actions import Node from launch_ros.actions import Node
from launch.conditions import IfCondition
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
@@ -51,7 +50,6 @@ def launch_setup(context, *args, **kwargs):
executable="ptz", # change as needed executable="ptz", # change as needed
name="ptz", name="ptz",
output="both", output="both",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")),
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
import serial import serial
import sys import sys
import threading import threading
@@ -8,10 +7,11 @@ import glob
import time import time
import atexit import atexit
import signal import signal
from std_msgs.msg import String, Header from std_msgs.msg import String
from astra_msgs.msg import ArmManual
from astra_msgs.msg import SocketFeedback
from astra_msgs.msg import DigitFeedback
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math import math
# control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
@@ -25,68 +25,37 @@ import math
# liveliness_lease_duration=5000 # liveliness_lease_duration=5000
# ) # )
serial_pub = None
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
class SerialRelay(Node):
class ArmNode(Node):
def __init__(self): def __init__(self):
# Initialize node
super().__init__("arm_node") super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output # Get launch mode parameter
self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
################################################## # Create publishers
# Topics self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.socket_pub = self.create_publisher(
# Anchor topics SocketFeedback, "/arm/feedback/socket", 10
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
) )
self.anchor_tovic_pub_ = self.create_publisher( self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
VicCAN, "/anchor/to_vic/relay", 20 self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 2
) )
self.anchor_sub = self.create_subscription( # New messages
String, "/anchor/arm/feedback", self.anchor_feedback, 10 self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
) self.joint_state = JointState()
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.joint_state.name = [
# Control
# Manual: who tf knows. Maybe JointJog?
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"Axis_0_Joint", "Axis_0_Joint",
"Axis_1_Joint", "Axis_1_Joint",
"Axis_2_Joint", "Axis_2_Joint",
@@ -95,63 +64,117 @@ class ArmNode(Node):
"Wrist-EF_Roll_Joint", "Wrist-EF_Roll_Joint",
"Gripper_Slider_Left", "Gripper_Slider_Left",
] ]
self.saved_joint_state.position = [0.0] * len( self.joint_state.position = [0.0] * len(
self.saved_joint_state.name self.joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros ) # Initialize with zeros
# Old self.joint_command_sub = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 10
# Create publishers
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
) )
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
self.man_sub = self.create_subscription( if self.launch_mode == "arm":
ArmManual, "/arm/control/manual", self.send_manual, 10 # Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for _ in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
) )
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
pass if self.launch_mode == "arm":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
# TODO: formalize joint names in URDF, refactor here to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians # Embedded takes deg*10, ROS2 uses Radians
velocities = [ positions = [math.degrees(pos) * 10 for pos in msg.position]
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
]
# Axis 2 & 3 URDF direction is inverted # Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2] positions[2] = -positions[2]
velocities[3] = -velocities[3] positions[3] = -positions[3]
# Axis 0-3 # Set target angles for each arm axis for embedded IK PID to handle
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3]) command = f"can_relay_tovic,arm,32,{positions[0]},{positions[1]},{positions[2]},{positions[3]}\n"
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg()) # Wrist yaw and roll
# Wrist yaw and roll, gripper included for future use when have adequate hardware command += f"can_relay_tovic,digit,32,{positions[4]},{positions[5]}\n"
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6]) # Gripper IK does not have adequate hardware yet
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg()) self.send_cmd(command)
self.anchor_tovic_pub_.publish(arm_cmd)
self.anchor_tovic_pub_.publish(digit_cmd)
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
axis0 = msg.axis0 axis0 = msg.axis0
@@ -167,7 +190,7 @@ class ArmNode(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n" command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n" command += f"can_relay_tovic,digit,28,{msg.laser}\n"
@@ -178,14 +201,23 @@ class ArmNode(Node):
return return
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
output = String(data=msg) if (
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
# pass
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"): elif output.startswith("can_relay_fromvic,arm,54"):
# pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
@@ -203,132 +235,15 @@ class ArmNode(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians(
float(parts[4])
) # Wrist roll
self.joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return
def relay_fromvic(self, msg: VicCAN):
# Code for socket and digit are broken out for cleaner code
if msg.mcu_name == "arm":
self.process_fromvic_arm(msg)
elif msg.mcu_name == "digit":
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.temperature = float(msg.data[1]) / 10.0
motor.voltage = float(msg.data[2]) / 10.0
motor.current = float(msg.data[3]) / 10.0
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 54: # Board voltages
self.arm_feedback_new.socket_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.socket_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.socket_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.socket_voltage.v3 = float(msg.data[3]) / 100.0
case 55: # Arm joint positions
angles = [angle / 10.0 for angle in msg.data] # VicCAN sends deg*10
# Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians(
-angles[2]
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
case 58: # REV SPARK MAX position and velocity feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.position = float(msg.data[1])
motor.velocity = float(msg.data[2])
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 59: # Arm joint velocities
velocities = [vel / 100.0 for vel in msg.data] # VicCAN sends deg/s*100
self.saved_joint_state.velocity[0] = math.radians(
velocities[0]
) # Axis 0
self.saved_joint_state.velocity[1] = math.radians(
velocities[1]
) # Axis 1
self.saved_joint_state.velocity[2] = math.radians(
-velocities[2]
) # Axis 2 (-)
self.saved_joint_state.velocity[3] = math.radians(
-velocities[3]
) # Axis 3 (-)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]
) # Wrist roll
self.saved_joint_state.position[5] = math.radians(
msg.data[1]
) # Wrist yaw
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
@@ -348,6 +263,16 @@ class ArmNode(Node):
self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3] self.arm_feedback.axis3_angle = angles[3]
# Joint state publisher for URDF visualization
self.joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted)
self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.joint_state.header.stamp = self.get_clock().now().to_msg()
self.joint_state_pub.publish(self.joint_state)
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
@@ -388,13 +313,33 @@ class ArmNode(Node):
self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
arm_node = ArmNode() global serial_pub
arm_node.run() serial_pub = SerialRelay()
rclpy.try_shutdown() serial_pub.run()
if __name__ == "__main__": if __name__ == "__main__":

Submodule src/astra_msgs deleted from 2264a2cb67

View File

@@ -1,241 +1,203 @@
import rclpy import signal
from rclpy.node import Node
import serial
import sys import sys
import threading import threading
import os
import glob
import time import time
import atexit
import signal import rclpy
from std_msgs.msg import String from astra_msgs.action import BioVacuum
from astra_msgs.msg import BioControl from astra_msgs.msg import BioControl, BioDistributor, VicCAN
from astra_msgs.msg import BioFeedback from astra_msgs.srv import BioTestTube
from rclpy.action import ActionServer
from rclpy.node import Node
from std_msgs.msg import Header, String
serial_pub = None serial_pub = None
thread = None thread = None
# used to verify the length of an incoming VicCAN feedback message
# key is VicCAN command_id, value is expected length of data list
viccan_citadel_msg_len_dict = {
# empty because not expecting any VicCAN from citadel atm
}
viccan_lance_msg_len_dict = {
# empty because not sure what VicCAN commands LANCE sends
}
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
super().__init__("bio_node") super().__init__("bio_node")
# Get launch mode parameter # Anchor Topics
self.declare_parameter("launch_mode", "bio") self.anchor_fromvic_sub_ = self.create_subscription(
self.launch_mode = self.get_parameter("launch_mode").value VicCAN, "/anchor/from_vic/bio", self.relay_fromvic, 20
self.get_logger().info(f"bio launch_mode is: {self.launch_mode}") )
self.anchor_tovic_pub_ = self.create_publisher(
# Create publishers VicCAN, "/anchor/to_vic/relay", 20
self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10)
self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10)
# Create subscribers
self.control_sub = self.create_subscription(
BioControl, "/bio/control", self.send_control, 10
) )
# Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/bio/feedback", self.anchor_feedback, 10 String, "/anchor/bio/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.bio_feedback = BioFeedback() self.distributor_sub = self.create_subscription(
BioDistributor,
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode "/bio/control/distributor",
if self.launch_mode == "bio": self.distributor_callback,
# Loop through all serial devices on the computer to check for the MCU 10,
self.port = None
for i in range(2):
try:
# connect and send a ping command
set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
) )
ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU # Services
if b"pong" in response: self.test_tube_service = self.create_service(
self.port = set_port BioTestTube, "/bio/control/test_tube", self.test_tube_callback
self.get_logger().info(f"Found MCU at {set_port}!")
break
except:
pass
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
) )
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200) # Actions
atexit.register(self.cleanup) self._action_server = ActionServer(
self, BioVacuum, "/bio/actions/vacuum", self.execute_vacuum
)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == "bio":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1) time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_control(self, msg: BioControl): def send_control(self, msg: BioControl):
# CITADEL Control Commands print("todo")
################
# Chem Pumps, only send if not zero
if msg.pump_id != 0:
command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command)
# Fans, only send if not zero
if msg.fan_id != 0:
command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command)
# Servos, only send if not zero
if msg.servo_id != 0:
command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command)
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command)
# Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command)
# FAERIE Control Commands
################
# To be reviewed before use#
# Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
# self.send_cmd(command)
# Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command)
# Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if ( # send to anchor node to relay
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String() output = String()
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Bio to MCU] {msg}") def relay_fromvic(self, msg: VicCAN):
self.ser.write(bytes(msg, "utf8")) # self.get_logger().info(msg)
if msg.mcu_name == "citadel":
self.process_fromvic_citadel(msg)
def process_fromvic_citadel(self, msg: VicCAN):
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_citadel_msg_len_dict:
expected_len = viccan_citadel_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
# self.get_logger().info(f"[Bio Anchor] {msg.data}") self.get_logger().info(f"[Bio Anchor] {msg.data}")
# no data planned to be received from citadel, not sure about lance
if output.startswith( def distributor_callback(self, msg: BioDistributor):
"can_relay_fromvic,citadel,54" distributor_arr = msg.distributor_id
): # bat, 12, 5, Voltage readings * 100 vic_cmd = VicCAN(
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 header=Header(stamp=self.get_clock().now().to_msg()),
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0 mcu_name="citadel",
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0 command_id=40,
elif output.startswith("can_relay_fromvic,digit,57"): data=[
self.bio_feedback.drill_temp = float(parts[3]) clamp_short(distributor_arr[0]),
self.bio_feedback.drill_humidity = float(parts[4]) clamp_short(distributor_arr[1]),
clamp_short(distributor_arr[2]),
0,
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
def publish_feedback(self): def test_tube_callback(self, request, response):
self.feedback_pub.publish(self.bio_feedback) tubes_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[
float(int.from_bytes(request.tube_id)),
float(request.milliliters),
],
)
self.anchor_tovic_pub_.publish(tubes_cmd)
return response
@staticmethod def execute_vacuum(self, goal_handle):
def list_serial_ports(): valve_id = int.from_bytes(goal_handle.request.valve_id)
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") duty = int.from_bytes(goal_handle.request.fan_duty_cycle_percent)
# return glob.glob("/dev/tty[A-Za-z]*") total = goal_handle.request.fan_time_ms
def cleanup(self): # open valve
print("Cleaning up...") self.anchor_tovic_pub_.publish(
try: VicCAN(
if self.ser.is_open: header=Header(stamp=self.get_clock().now().to_msg()),
self.ser.close() mcu_name="citadel",
except Exception as e: command_id=40,
exit(0) data=[float(valve_id)],
)
)
# set fan duty cycle
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[float(duty)],
)
)
feedback = BioVacuum.Feedback()
start = time.time()
while True:
elapsed = int((time.time() - start) * 1000)
remaining = max(0, total - elapsed)
feedback.fan_time_remaining_ms = remaining
goal_handle.publish_feedback(feedback)
if remaining == 0:
break
time.sleep(0.1)
# stop fan
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[0.0],
)
)
# close valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[-1.0],
)
)
goal_handle.succeed()
return BioVacuum.Result()
def clamp_short(x: int) -> int:
return max(-32768, min(32767, x))
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):

View File

@@ -31,14 +31,14 @@ CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message

View File

@@ -16,10 +16,8 @@ import pwd
import grp import grp
from math import copysign from math import copysign
from std_srvs.srv import Trigger
from std_msgs.msg import String from std_msgs.msg import String
from geometry_msgs.msg import Twist, TwistStamped from geometry_msgs.msg import Twist
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState
@@ -37,27 +35,18 @@ ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
)
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
) )
CORE_MODE = "twist" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
class Headless(Node): class Headless(Node):
def __init__(self): def __init__(self):
@@ -116,16 +105,10 @@ class Headless(Node):
break break
id += 1 id += 1
self.create_timer(0.1, self.send_controls) self.create_timer(0.15, self.send_controls)
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher( self.core_twist_pub_ = self.create_publisher(
@@ -135,41 +118,6 @@ class Headless(Node):
CoreCtrlState, "/core/control/state", qos_profile=control_qos CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value
self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
# Check parameter validity
if self.arm_mode not in ["manual", "ik"]:
self.get_logger().warn(
f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)."
)
if self.arm_manual_scheme not in ["old", "new"]:
self.get_logger().warn(
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
)
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.arm_mode == "ik":
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
)
timeout_counter = 0
while not self.servo_start_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Waiting for servo_node/start_servo service...")
timeout_counter += 1
if timeout_counter >= 10:
self.get_logger().error(
"Servo's start service not available. IK control will not work."
)
break
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
@@ -177,6 +125,18 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
for event in pygame.event.get(): for event in pygame.event.get():
@@ -206,9 +166,6 @@ class Headless(Node):
new_ctrl_mode = "core" new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode: if new_ctrl_mode != self.ctrl_mode:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.gamepad.rumble(0.6, 0.7, 75) self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
@@ -237,6 +194,8 @@ class Headless(Node):
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.core_publisher.publish(input) self.core_publisher.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
elif self.ctrl_mode == "core" and CORE_MODE == "twist": elif self.ctrl_mode == "core" and CORE_MODE == "twist":
input = Twist() input = Twist()
@@ -256,6 +215,8 @@ class Headless(Node):
# Publish # Publish
self.core_twist_pub_.publish(input) self.core_twist_pub_.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info( self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}" f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
) )
@@ -286,7 +247,7 @@ class Headless(Node):
) )
# ARM and BIO # ARM and BIO
if self.ctrl_mode == "arm" and self.arm_mode == "manual": if self.ctrl_mode == "arm":
arm_input = ArmManual() arm_input = ArmManual()
# Collect controller state # Collect controller state
@@ -296,15 +257,9 @@ class Headless(Node):
right_stick_x = deadzone(self.gamepad.get_axis(3)) right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4)) right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5)) right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5) right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0) dpad_input = self.gamepad.get_hat(0)
if self.arm_manual_scheme == "old":
# EF Grippers # EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0 arm_input.gripper = 0
@@ -347,64 +302,6 @@ class Headless(Node):
if abs(right_stick_y) > 0.15: if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
if self.arm_manual_scheme == "new":
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: _
# X: _
# Y: linear actuator
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.effector_yaw = (
0 if abs(right_stick_x) < ARM_THRESHOLD else int(copysign(1, right_stick_x))
)
arm_input.axis3 = (
0 if abs(right_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, right_stick_y))
)
# Left stick: axis 1 and 2
arm_input.axis1 = (
0 if abs(left_stick_x) < ARM_THRESHOLD else int(copysign(1, left_stick_x))
)
arm_input.axis2 = (
0 if abs(left_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, left_stick_y))
)
# D-pad: axis 0 and _
arm_input.axis0 = (
0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0
elif left_trigger > 0:
arm_input.gripper = -1
elif right_trigger > 0:
arm_input.gripper = 1
# Bumpers: EF roll
if left_bumper > 0 and right_bumper > 0:
arm_input.effector_roll = 0
elif left_bumper > 0:
arm_input.effector_roll = -1
elif right_bumper > 0:
arm_input.effector_roll = 1
# A: brake
if button_a:
arm_input.brake = True
# Y: linear actuator
if button_y:
arm_input.linear_actuator = 1
# BIO # BIO
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
@@ -417,75 +314,12 @@ class Headless(Node):
30 * (right_trigger - left_trigger) 30 * (right_trigger - left_trigger)
) # Max duty cycle 30% ) # Max duty cycle 30%
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(arm_input) self.arm_publisher.publish(arm_input)
# self.bio_publisher.publish(bio_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg()
arm_jointjog = JointJog()
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x
# Left stick: angular z and linear z
# D-pad: angular y and _
# Triggers: EF grippers
# Bumpers: angular x
# A: brake
# B: IK mode
# X: manual mode
# Y: linear actuator
# Right stick: linear y and linear x
arm_twist.twist.linear.y = float(right_stick_x)
arm_twist.twist.linear.x = float(right_stick_y)
# Left stick: angular z and linear z
arm_twist.twist.angular.z = float(-1 * left_stick_x)
arm_twist.twist.linear.z = float(-1 * left_stick_y)
# D-pad: angular y and _
arm_twist.twist.angular.y = (
float(0)
if dpad_input[0] == 0
else float(-1 * copysign(0.75, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append(
"Gripper_Slider_Left" # TODO: Update joint name
)
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
if left_bumper > 0 and right_bumper > 0:
arm_twist.twist.angular.x = float(0)
elif left_bumper > 0:
arm_twist.twist.angular.x = float(1)
elif right_bumper > 0:
arm_twist.twist.angular.x = float(-1)
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=0.05) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""
if abs(value) < threshold: if abs(value) < threshold:
return 0 return 0
@@ -519,7 +353,7 @@ def main(args=None):
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...") print("Caught shutdown signal. Exiting...")
finally: finally:
rclpy.try_shutdown() rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -26,7 +26,7 @@ class LatencyTester : public rclcpp::Node
{ {
public: public:
LatencyTester() LatencyTester()
: Node("latency_tester"), count_(0) : Node("latency_tester"), count_(0), target_mcu_("core")
{ {
publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10); publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10);
timer_ = this->create_wall_timer( timer_ = this->create_wall_timer(
@@ -35,8 +35,6 @@ public:
"/anchor/debug", "/anchor/debug",
10, 10,
std::bind(&LatencyTester::response_callback, this, std::placeholders::_1)); std::bind(&LatencyTester::response_callback, this, std::placeholders::_1));
target_mcu_ = this->declare_parameter<std::string>("target_mcu", "core");
} }
private: private:

View File

@@ -107,23 +107,23 @@ bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& but
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist, std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist,
std::unique_ptr<control_msgs::msg::JointJog>& joint) std::unique_ptr<control_msgs::msg::JointJog>& joint)
{ {
// Give joint jogging priority because it is only buttons // // Give joint jogging priority because it is only buttons
// If any joint jog command is requested, we are only publishing joint commands // // If any joint jog command is requested, we are only publishing joint commands
if (0) // if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y])
{ // {
// Map the D_PAD to the proximal joints // // Map the D_PAD to the proximal joints
joint->joint_names.push_back("panda_joint1"); // joint->joint_names.push_back("panda_joint1");
joint->velocities.push_back(axes[D_PAD_X]); // joint->velocities.push_back(axes[D_PAD_X]);
joint->joint_names.push_back("panda_joint2"); // joint->joint_names.push_back("panda_joint2");
joint->velocities.push_back(axes[D_PAD_Y]); // joint->velocities.push_back(axes[D_PAD_Y]);
// Map the diamond to the distal joints // // Map the diamond to the distal joints
joint->joint_names.push_back("panda_joint7"); // joint->joint_names.push_back("panda_joint7");
joint->velocities.push_back(buttons[B] - buttons[X]); // joint->velocities.push_back(buttons[B] - buttons[X]);
joint->joint_names.push_back("panda_joint6"); // joint->joint_names.push_back("panda_joint6");
joint->velocities.push_back(buttons[Y] - buttons[A]); // joint->velocities.push_back(buttons[Y] - buttons[A]);
return false; // return false;
} // }
// The bread and butter: map buttons to twist commands // The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y]; twist->twist.linear.z = axes[RIGHT_STICK_Y];
@@ -243,13 +243,13 @@ public:
twist_msg->header.stamp = this->now(); twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg)); twist_pub_->publish(std::move(twist_msg));
} }
else // else
{ // {
// publish the JointJog // // publish the JointJog
joint_msg->header.stamp = this->now(); // joint_msg->header.stamp = this->now();
joint_msg->header.frame_id = "panda_link3"; // joint_msg->header.frame_id = "panda_link3";
joint_pub_->publish(std::move(joint_msg)); // joint_pub_->publish(std::move(joint_msg));
} // }
} }
private: private: