14 Commits

26 changed files with 1051 additions and 1068 deletions

3
.gitmodules vendored
View File

@@ -1,6 +1,3 @@
[submodule "src/astra_description"] [submodule "src/astra_description"]
path = src/astra_descriptions path = src/astra_descriptions
url = ../astra_descriptions url = ../astra_descriptions
[submodule "src/astra_msgs"]
path = src/astra_msgs
url = git@github.com:SHC-ASTRA/astra_msgs.git

View File

@@ -16,9 +16,6 @@ You will use these packages to launch all rover-side ROS2 nodes.
- [Connecting the GuliKit Controller](#connecting-the-gulikit-controller) - [Connecting the GuliKit Controller](#connecting-the-gulikit-controller)
- [Common Problems/Toubleshooting](#common-problemstroubleshooting) - [Common Problems/Toubleshooting](#common-problemstroubleshooting)
- [Packages](#packages) - [Packages](#packages)
- [Graphs](#graphs)
- [Full System](#full-system)
- [Individual Nodes](#individual-nodes)
- [Maintainers](#maintainers) - [Maintainers](#maintainers)
## Software Prerequisites ## Software Prerequisites
@@ -143,40 +140,6 @@ A: To find a microcontroller to talk to, Anchor sends a ping to every Serial por
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`). - [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK. - [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Graphs
### Full System
> **Anchor stand-alone** (`ros2 launch anchor_pkg rover.launch.py`)
>
> ![rqt_graph of Anchor by itself, ran with command: ros2 launch anchor_pkg rover.launch.py](./docs-resources/graph-anchor-standalone.png)
> **Anchor with [basestation-classic](https://github.com/SHC-ASTRA/basestation-classic)**
>
> ![rqt_graph of Anchor ran with the same command as above, talking to basestation-classic](./docs-resources/graph-anchor-w-basestation-classic.png)
> **Anchor with Headless** (`ros2 run headless_pkg headless_full`)
>
> ![rqt_graph of Anchor ran with Headless](./docs-resources/graph-anchor-w-headless.png)
### Individual Nodes
> **Anchor** (`ros2 run anchor_pkg anchor`)
>
> ![rqt_graph of Anchor node running by itself](./docs-resources/graph-anchor-anchor-standalone.png)
> **Core** (`ros2 run core_pkg core --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Core node running by itself](./docs-resources/graph-anchor-core-standalone.png)
> **Arm** (`ros2 run arm_pkg arm --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Arm node running by itself](./docs-resources/graph-anchor-arm-standalone.png)
> **Bio** (`ros2 run bio_pkg bio --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Bio node running by itself](./docs-resources/graph-anchor-bio-standalone.png)
## Maintainers ## Maintainers
| Name | Email | Discord | | Name | Email | Discord |

View File

@@ -14,12 +14,8 @@ echo "[INFO] Network interface is up!"
# Your actual ROS node start command goes here # Your actual ROS node start command goes here
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script (if we aren't using nix)
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then command -v ros2 || source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 543 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 439 KiB

102
flake.lock generated
View File

@@ -1,5 +1,29 @@
{ {
"nodes": { "nodes": {
"astra-msgs": {
"inputs": {
"nix-ros-overlay": "nix-ros-overlay",
"nixpkgs": [
"astra-msgs",
"nix-ros-overlay",
"nixpkgs"
]
},
"locked": {
"lastModified": 1771845042,
"narHash": "sha256-pGsb93ZlMhP3biy8S2eJc1sW+35vmaxlWHTbuwZDlQI=",
"owner": "SHC-ASTRA",
"repo": "astra_msgs",
"rev": "acabfd117d9711afc420612375b4e02f4ce4982d",
"type": "github"
},
"original": {
"owner": "SHC-ASTRA",
"repo": "astra_msgs",
"rev": "acabfd117d9711afc420612375b4e02f4ce4982d",
"type": "github"
}
},
"flake-utils": { "flake-utils": {
"inputs": { "inputs": {
"systems": "systems" "systems": "systems"
@@ -18,17 +42,55 @@
"type": "github" "type": "github"
} }
}, },
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-ros-overlay": { "nix-ros-overlay": {
"inputs": { "inputs": {
"flake-utils": "flake-utils", "flake-utils": "flake-utils",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": { "locked": {
"lastModified": 1770108954, "lastModified": 1770622967,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=", "narHash": "sha256-1LYjTugPSCa/5NkP6/dcZLH5TQYj3R8mAZ/9dgd7jDM=",
"owner": "lopsided98", "owner": "lopsided98",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a", "rev": "d1b9f17eba909116356436d46b5192d299c6b49a",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nix-ros-overlay_2": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1771404745,
"narHash": "sha256-UVP3TsQJ4PezyQG3B1SsgsTxz32XBVzplJ/cgq7v/uk=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "7cdf7b44ff186869baedbb3b82e5b409bb3e8dd9",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -54,9 +116,26 @@
"type": "github" "type": "github"
} }
}, },
"nixpkgs_2": {
"locked": {
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "nix-ros",
"repo": "nixpkgs",
"type": "github"
}
},
"root": { "root": {
"inputs": { "inputs": {
"nix-ros-overlay": "nix-ros-overlay", "astra-msgs": "astra-msgs",
"nix-ros-overlay": "nix-ros-overlay_2",
"nixpkgs": [ "nixpkgs": [
"nix-ros-overlay", "nix-ros-overlay",
"nixpkgs" "nixpkgs"
@@ -77,6 +156,21 @@
"repo": "default", "repo": "default",
"type": "github" "type": "github"
} }
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
} }
}, },
"root": "root", "root": "root",

View File

@@ -3,15 +3,13 @@
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs";
astra-msgs.url =
"github:SHC-ASTRA/astra_msgs/acabfd117d9711afc420612375b4e02f4ce4982d";
}; };
outputs = outputs =
{ { self, nix-ros-overlay, nixpkgs, astra-msgs }:
self,
nix-ros-overlay,
nixpkgs,
}:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem ( nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system: system:
let let
@@ -19,26 +17,31 @@
inherit system; inherit system;
overlays = [ nix-ros-overlay.overlays.default ]; overlays = [ nix-ros-overlay.overlays.default ];
}; };
astra_msgs_pkgs = astra-msgs.packages.${system};
rosDistro = "humble";
in in
{ {
devShells.default = pkgs.mkShell { devShells.default = pkgs.mkShell {
name = "ASTRA Anchor"; name = "ASTRA Anchor";
packages = with pkgs; [ packages = with pkgs; [
colcon colcon
(python313.withPackages ( astra_msgs_pkgs.astra-msgs
p: with p; [
pyserial (pkgs.rosPackages.${rosDistro}.python3.withPackages (p: with p; [
pygame pyserial
scipy pygame
crccheck scipy
black crccheck
] black
)) ]))
(
with rosPackages.humble; (with rosPackages.${rosDistro};
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
rqt-graph
ros2cli ros2cli
ros2run ros2run
ros2bag ros2bag
@@ -61,7 +64,7 @@
moveit-msgs moveit-msgs
moveit-ros-planning moveit-ros-planning
moveit-ros-planning-interface moveit-ros-planning-interface
moveit-ros-visualization moveit-ros-visualization
moveit-configs-utils moveit-configs-utils
moveit-ros-move-group moveit-ros-move-group
moveit-servo moveit-servo
@@ -72,13 +75,16 @@
ompl ompl
joy joy
ros2-controllers ros2-controllers
chomp-motion-planner chomp-motion-planner
]; ];
} })
)
]; ];
env = {
ASTRAMSGS = "${astra-msgs.outPath}";
};
shellHook = '' shellHook = ''
# Display stuff
export DISPLAY=''${DISPLAY:-:0} export DISPLAY=''${DISPLAY:-:0}
export QT_X11_NO_MITSHM=1 export QT_X11_NO_MITSHM=1
''; '';
@@ -88,6 +94,7 @@
nixConfig = { nixConfig = {
extra-substituters = [ "https://ros.cachix.org" ]; extra-substituters = [ "https://ros.cachix.org" ];
extra-trusted-public-keys = [ "ros.cachix.org-1:dSyZxI8geDCJrwgvCOHDoAfOm5sV1wCPjBkKL+38Rvo=" ]; extra-trusted-public-keys =
[ "ros.cachix.org-1:dSyZxI8geDCJrwgvCOHDoAfOm5sV1wCPjBkKL+38Rvo=" ];
}; };
} }

View File

@@ -8,7 +8,6 @@ from launch.substitutions import (
PathJoinSubstitution, PathJoinSubstitution,
) )
from launch_ros.actions import Node from launch_ros.actions import Node
from launch.conditions import IfCondition
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
@@ -51,7 +50,6 @@ def launch_setup(context, *args, **kwargs):
executable="ptz", # change as needed executable="ptz", # change as needed
name="ptz", name="ptz",
output="both", output="both",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")),
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )

View File

@@ -1,225 +1,182 @@
import sys
import threading
import signal
import math
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException import serial
from rclpy import qos import sys
import threading
from std_msgs.msg import String, Header import glob
import time
import atexit
import signal
from std_msgs.msg import String
from astra_msgs.msg import ArmManual
from astra_msgs.msg import SocketFeedback
from astra_msgs.msg import DigitFeedback
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog import math
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, # depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), # deadline=1000,
# lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=5000
) # )
serial_pub = None
thread = None thread = None
class ArmNode(Node): class SerialRelay(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
def __init__(self): def __init__(self):
# Initialize node
super().__init__("arm_node") super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output # Get launch mode parameter
self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
################################################## # Create publishers
# Parameters self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
)
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
self.declare_parameter("use_old_topics", True) # Create subscribers
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 2
)
################################################## # New messages
# Old topics self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
self.joint_state = JointState()
self.joint_state.name = [
"Axis_0_Joint",
"Axis_1_Joint",
"Axis_2_Joint",
"Axis_3_Joint",
"Wrist_Differential_Joint",
"Wrist-EF_Roll_Joint",
"Gripper_Slider_Left",
]
self.joint_state.position = [0.0] * len(
self.joint_state.name
) # Initialize with zeros
if self.use_old_topics: self.joint_command_sub = self.create_subscription(
# Anchor topics JointState, "/joint_commands", self.joint_command_callback, 10
)
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 String, "/anchor/arm/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Create publishers self.arm_feedback = SocketFeedback()
self.socket_pub = self.create_publisher( self.digit_feedback = DigitFeedback()
SocketFeedback, "/arm/feedback/socket", 10
)
self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
self.man_sub = self.create_subscription( if self.launch_mode == "arm":
ArmManual, "/arm/control/manual", self.send_manual, 10 # Loop through all serial devices on the computer to check for the MCU
) self.port = None
ports = SerialRelay.list_serial_ports()
for _ in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
################################################### # if pong is in response, then we are talking with the MCU
# New topics if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
# Anchor topics if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
# from_vic self.ser = serial.Serial(self.port, 115200)
self.anchor_fromvic_sub_ = self.create_subscription( atexit.register(self.cleanup)
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Control def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
# Manual: /arm/manual_new is published by Servo or Basestation # if in arm mode, will need to read from the MCU
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
)
# Feedback try:
while rclpy.ok():
if self.launch_mode == "arm":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.cleanup()
# Combined Socket and Digit feedback # Currently will just spit out all values over the /arm/feedback/debug topic as strings
self.arm_feedback_pub_ = self.create_publisher( def read_mcu(self):
ArmFeedback, try:
"/arm/feedback/new_feedback", output = str(self.ser.readline(), "utf8")
qos_profile=qos.qos_profile_sensor_data, if output:
) # self.get_logger().info(f"[MCU] {output}")
# IK arm pose: /joint_states is published from here to topic_based_control msg = String()
self.joint_state_pub_ = self.create_publisher( msg.data = output
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data self.debug_pub.publish(msg)
) except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
################################################### if self.ser.is_open:
# Saved state self.ser.close()
pass
# Combined Socket and Digit feedback except TypeError as e:
self.arm_feedback_new = ArmFeedback() self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
# IK Arm pose if self.ser.is_open:
self.saved_joint_state = JointState() self.ser.close()
self.saved_joint_state.name = self.all_joint_names pass
# ... initialize with zeros except Exception as e:
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name) print(f"Exception: {e}")
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name) print("Closing serial port.")
if self.ser.is_open:
def jointjog_callback(self, msg: JointJog): self.ser.close()
if ( pass
len(msg.joint_names) == 0
or len(msg.velocities) == 0
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)] # type: ignore
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
)
)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
# TODO: refactor to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians # Embedded takes deg*10, ROS2 uses Radians
velocities = [ positions = [math.degrees(pos) * 10 for pos in msg.position]
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
]
# Axis 2 & 3 URDF direction is inverted # Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2] positions[2] = -positions[2]
velocities[3] = -velocities[3] positions[3] = -positions[3]
# Axis 0-3 # Set target angles for each arm axis for embedded IK PID to handle
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3]) command = f"can_relay_tovic,arm,32,{positions[0]},{positions[1]},{positions[2]},{positions[3]}\n"
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg()) # Wrist yaw and roll
# Wrist yaw and roll, gripper included for future use when have adequate hardware command += f"can_relay_tovic,digit,32,{positions[4]},{positions[5]}\n"
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6]) # Gripper IK does not have adequate hardware yet
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg()) self.send_cmd(command)
self.anchor_tovic_pub_.publish(arm_cmd)
self.anchor_tovic_pub_.publish(digit_cmd)
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
"""TODO: Old"""
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
@@ -244,16 +201,23 @@ class ArmNode(Node):
return return
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
"""TODO: Old""" if (
output = String(data=msg) self.launch_mode == "anchor"
self.anchor_pub.publish(output) ): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
"""TODO: Old"""
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
# pass
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"): elif output.startswith("can_relay_fromvic,arm,54"):
# pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
@@ -271,140 +235,20 @@ class ArmNode(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians(
float(parts[4])
) # Wrist roll
self.joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return
def relay_fromvic(self, msg: VicCAN):
# Code for socket and digit are broken out for cleaner code
if msg.mcu_name == "arm":
self.process_fromvic_arm(msg)
elif msg.mcu_name == "digit":
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm":
return
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.temperature = float(msg.data[1]) / 10.0
motor.voltage = float(msg.data[2]) / 10.0
motor.current = float(msg.data[3]) / 10.0
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 54: # Board voltages
self.arm_feedback_new.socket_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.socket_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.socket_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.socket_voltage.v3 = float(msg.data[3]) / 100.0
self.arm_feedback_new.socket_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions
angles = [angle / 10.0 for angle in msg.data] # VicCAN sends deg*10
# Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians(
-angles[2]
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
case 58: # REV SPARK MAX position and velocity feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.position = float(msg.data[1])
motor.velocity = float(msg.data[2])
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 59: # Arm joint velocities
velocities = [vel / 100.0 for vel in msg.data] # VicCAN sends deg/s*100
self.saved_joint_state.velocity[0] = math.radians(
velocities[0]
) # Axis 0
self.saved_joint_state.velocity[1] = math.radians(
velocities[1]
) # Axis 1
self.saved_joint_state.velocity[2] = math.radians(
-velocities[2]
) # Axis 2 (-)
self.saved_joint_state.velocity[3] = math.radians(
-velocities[3]
) # Axis 3 (-)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit":
return
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]
) # Wrist roll
self.saved_joint_state.position[5] = math.radians(
msg.data[1]
) # Wrist yaw
def publish_feedback(self): def publish_feedback(self):
"""TODO: Old"""
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: str):
"""TODO: Old"""
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas # split the msg.data by commas
parts = msg.split(",") parts = msg.split(",")
@@ -419,11 +263,20 @@ class ArmNode(Node):
self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3] self.arm_feedback.axis3_angle = angles[3]
# Joint state publisher for URDF visualization
self.joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted)
self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.joint_state.header.stamp = self.get_clock().now().to_msg()
self.joint_state_pub.publish(self.joint_state)
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str): def updateBusVoltage(self, msg: str):
"""TODO: Old"""
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -438,7 +291,6 @@ class ArmNode(Node):
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg: str): def updateMotorFeedback(self, msg: str):
"""TODO: Old"""
parts = str(msg.strip()).split(",") parts = str(msg.strip()).split(",")
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0 temp = float(parts[4]) / 10.0
@@ -461,20 +313,33 @@ class ArmNode(Node):
self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
arm_node = ArmNode() global serial_pub
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True) serial_pub = SerialRelay()
thread.start() serial_pub.run()
try:
thread.join()
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":

Submodule src/astra_msgs deleted from 2264a2cb67

View File

@@ -1,241 +1,251 @@
import rclpy import signal
from rclpy.node import Node
import serial
import sys import sys
import threading import threading
import os
import glob
import time import time
import atexit
import signal import rclpy
from std_msgs.msg import String from astra_msgs.action import BioVacuum
from astra_msgs.msg import BioControl from astra_msgs.msg import CitadelControl, FaerieControl, VicCAN
from astra_msgs.msg import BioFeedback from astra_msgs.srv import BioTestTube, LibsSystem
from rclpy.action import ActionServer
from rclpy.node import Node
from std_msgs.msg import Header, String
serial_pub = None serial_pub = None
thread = None thread = None
# used to verify the length of an incoming VicCAN feedback message
# key is VicCAN command_id, value is expected length of data list
viccan_citadel_msg_len_dict = {
# empty because not expecting any VicCAN from citadel atm
}
viccan_lance_msg_len_dict = {
# empty because not sure what VicCAN commands LANCE sends
}
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
super().__init__("bio_node") super().__init__("bio_node")
# Get launch mode parameter # Anchor Topics
self.declare_parameter("launch_mode", "bio") self.anchor_fromvic_sub_ = self.create_subscription(
self.launch_mode = self.get_parameter("launch_mode").value VicCAN, "/anchor/from_vic/bio", self.relay_fromvic, 20
self.get_logger().info(f"bio launch_mode is: {self.launch_mode}") )
self.anchor_tovic_pub_ = self.create_publisher(
# Create publishers VicCAN, "/anchor/to_vic/relay", 20
self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10)
self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10)
# Create subscribers
self.control_sub = self.create_subscription(
BioControl, "/bio/control", self.send_control, 10
) )
# Create a publisher for telemetry self.anchor_sub = self.create_subscription(
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) String, "/anchor/bio/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Topics used in anchor mode # Messages
if self.launch_mode == "anchor": self.citadel_sub = self.create_subscription(
self.anchor_sub = self.create_subscription( CitadelControl,
String, "/anchor/bio/feedback", self.anchor_feedback, 10 "/bio/control/citadel",
) self.citadel_callback,
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) 10,
)
self.bio_feedback = BioFeedback() self.faerie_sub = self.create_subscription(
FaerieControl,
"/bio/control/faerie",
self.faerie_callback,
10,
)
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Services
if self.launch_mode == "bio": self.test_tube_service = self.create_service(
# Loop through all serial devices on the computer to check for the MCU BioTestTube, "/bio/control/test_tube", self.test_tube_callback
self.port = None )
for i in range(2): self.libs_service = self.create_service(
try: LibsSystem, "bio/control/libs_system", self.libs_callback
# connect and send a ping command )
set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
)
ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU # Actions
if b"pong" in response: self._action_server = ActionServer(
self.port = set_port self, BioVacuum, "/bio/actions/vacuum", self.execute_vacuum
self.get_logger().info(f"Found MCU at {set_port}!") )
break
except:
pass
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == "bio": time.sleep(0.1)
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_control(self, msg: BioControl):
# CITADEL Control Commands
################
# Chem Pumps, only send if not zero
if msg.pump_id != 0:
command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command)
# Fans, only send if not zero
if msg.fan_id != 0:
command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command)
# Servos, only send if not zero
if msg.servo_id != 0:
command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command)
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command)
# Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command)
# FAERIE Control Commands
################
# To be reviewed before use#
# Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
# self.send_cmd(command)
# Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command)
# Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if ( # send to anchor node to relay
self.launch_mode == "anchor" output = String()
): # if in anchor mode, send to anchor node to relay output.data = msg
output = String() self.anchor_pub.publish(output)
output.data = msg
self.anchor_pub.publish(output) def relay_fromvic(self, msg: VicCAN):
elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly # self.get_logger().info(msg)
self.get_logger().info(f"[Bio to MCU] {msg}") if msg.mcu_name == "citadel":
self.ser.write(bytes(msg, "utf8")) self.process_fromvic_citadel(msg)
def process_fromvic_citadel(self, msg: VicCAN):
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_citadel_msg_len_dict:
expected_len = viccan_citadel_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
# self.get_logger().info(f"[Bio Anchor] {msg.data}") self.get_logger().info(f"[Bio Anchor] {msg.data}")
# no data planned to be received from citadel, not sure about lance
if output.startswith( def citadel_callback(self, msg: CitadelControl):
"can_relay_fromvic,citadel,54" distributor_arr = msg.distributor_id
): # bat, 12, 5, Voltage readings * 100 # Distributor Control
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 vic_cmd = VicCAN(
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0 header=Header(stamp=self.get_clock().now().to_msg()),
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0 mcu_name="citadel",
elif output.startswith("can_relay_fromvic,digit,57"): command_id=40,
self.bio_feedback.drill_temp = float(parts[3]) data=[
self.bio_feedback.drill_humidity = float(parts[4]) clamp_short(distributor_arr[0]),
clamp_short(distributor_arr[1]),
clamp_short(distributor_arr[2]),
0,
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
# Move Scythe
vic_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="digit",
command_id=42,
data=[float(msg.move_scythe)],
)
self.anchor_tovic_pub_.publish(vic_cmd)
def publish_feedback(self): def faerie_callback(self, msg: FaerieControl):
self.feedback_pub.publish(self.bio_feedback) # Move Faerie
vic_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="digit",
command_id=42,
data=[float(msg.move_faerie)],
)
self.anchor_tovic_pub_.publish(vic_cmd)
# Drill Speed
vic_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="digit",
command_id=19,
data=[
float(msg.drill_speed * 100)
], # change on embedded so we can go (-1,1)
)
self.anchor_tovic_pub_.publish(vic_cmd)
# Vanity Laser Control
vic_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="digit",
command_id=28,
data=[float(msg.vanity_laser)],
)
self.anchor_tovic_pub_.publish(vic_cmd)
@staticmethod def test_tube_callback(self, request, response):
def list_serial_ports(): vic_cmd = VicCAN(
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") header=Header(stamp=self.get_clock().now().to_msg()),
# return glob.glob("/dev/tty[A-Za-z]*") mcu_name="citadel",
command_id=40,
data=[
float(int(request.tube_id)),
float(request.milliliters),
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
return response
def cleanup(self): def libs_callback(self, request, response):
print("Cleaning up...") print("todo")
try:
if self.ser.is_open: def execute_vacuum(self, goal_handle):
self.ser.close() valve_id = int(goal_handle.request.valve_id)
except Exception as e: duty = int(goal_handle.request.fan_duty_cycle_percent)
exit(0) total = goal_handle.request.fan_time_ms
# open valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[float(valve_id)],
)
)
feedback = BioVacuum.Feedback()
start = time.time()
while True:
# set fan duty cycle
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[float(duty)],
)
)
elapsed = int((time.time() - start) * 1000)
remaining = max(0, total - elapsed)
feedback.fan_time_remaining_ms = remaining
goal_handle.publish_feedback(feedback)
if remaining == 0:
break
time.sleep(0.1)
# stop fan
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[0.0],
)
)
# close valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[-1.0],
)
)
goal_handle.succeed()
return BioVacuum.Result()
def clamp_short(x: int) -> int:
return max(-32768, min(32767, x))
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):

View File

@@ -31,10 +31,10 @@ CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,

View File

@@ -6,28 +6,21 @@ from rclpy.duration import Duration
import signal import signal
import time import time
import atexit
import os import os
import sys import sys
import threading
import glob
import pwd import pwd
import grp import grp
from math import copysign from math import copysign
from std_srvs.srv import Trigger from std_msgs.msg import String
from std_msgs.msg import Header from geometry_msgs.msg import Twist
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState
import warnings
# Literally headless
warnings.filterwarnings(
"ignore",
message="Your system is avx2 capable but pygame was not built with support for it.",
)
import pygame import pygame
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
@@ -39,55 +32,29 @@ os.environ["SDL_AUDIODRIVER"] = (
CORE_STOP_MSG = CoreControl() # All zeros by default CORE_STOP_MSG = CoreControl() # All zeros by default
CORE_STOP_TWIST_MSG = Twist() # " CORE_STOP_TWIST_MSG = Twist() # "
ARM_STOP_MSG = ArmManual() # " ARM_STOP_MSG = ArmManual() # "
ARM_IK_STOP_MSG = TwistStamped() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
ARM_DEADZONE = float(os.getenv("ARM_DEADZONE", "0.2"))
class Headless(Node): class Headless(Node):
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
def __init__(self): def __init__(self):
# Initialize pygame first # Initialize pygame first
pygame.init() pygame.init()
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless") super().__init__("headless")
# TODO: move the STOP_MSGs somewhere better
global ARM_STOP_JOG_MSG
ARM_STOP_JOG_MSG = JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
##################################################
# Preamble
# Wait for anchor to start # Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug") pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
while len(pub_info) == 0: while len(pub_info) == 0:
@@ -138,119 +105,37 @@ class Headless(Node):
break break
id += 1 id += 1
################################################## self.create_timer(0.15, self.send_controls)
# Parameters
self.declare_parameter("use_old_topics", True) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.use_old_topics = ( self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.get_parameter("use_old_topics").get_parameter_value().bool_value self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
) )
self.core_state_pub_ = self.create_publisher(
self.declare_parameter("use_bio", False) CoreCtrlState, "/core/control/state", qos_profile=control_qos
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
self.declare_parameter("use_arm_ik", False)
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
) )
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
self.get_parameter("use_new_arm_manual_scheme")
.get_parameter_value()
.bool_value
)
# Check parameter validity
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
if not self.use_new_arm_manual_scheme and not self.use_old_topics:
self.get_logger().warn(
f"New arm manual does not support old control scheme. Defaulting to new scheme."
)
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
if not self.use_old_topics:
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
)
# TODO: add new bio topics
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.use_arm_ik:
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
)
timeout_counter = 0
while not self.servo_start_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Waiting for servo_node/start_servo service...")
timeout_counter += 1
if timeout_counter >= 10:
self.get_logger().error(
"Servo's start service not available. IK control will not work."
)
break
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def stop_all(self): def run(self):
if self.use_old_topics: # This thread makes all the update processes run in the background
self.core_publisher.publish(CORE_STOP_MSG) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
self.arm_publisher.publish(ARM_STOP_MSG) thread.start()
self.bio_publisher.publish(BIO_STOP_MSG)
else: try:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG) while rclpy.ok():
if self.use_arm_ik: self.send_controls()
self.arm_ik_twist_publisher.publish(ARM_IK_STOP_MSG) time.sleep(0.1) # Small delay to avoid CPU hogging
else: except KeyboardInterrupt:
self.arm_manual_pub_.publish(ARM_STOP_JOG_MSG) sys.exit(0)
# TODO: add bio here after implementing new topics
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
@@ -262,8 +147,10 @@ class Headless(Node):
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads: if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Stop the rover if controller disconnected # Send one last zero control message
self.stop_all() self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info("Final stop commands sent. Shutting down.") self.get_logger().info("Final stop commands sent. Shutting down.")
# Clean up # Clean up
pygame.quit() pygame.quit()
@@ -279,51 +166,20 @@ class Headless(Node):
new_ctrl_mode = "core" new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode: if new_ctrl_mode != self.ctrl_mode:
self.stop_all()
self.gamepad.rumble(0.6, 0.7, 75) self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
if self.ctrl_mode == "arm" and self.use_bio:
self.get_logger().warning("NOTE: Using bio instead of arm.")
# Actually send the controls # CORE
if self.ctrl_mode == "core": if self.ctrl_mode == "core" and CORE_MODE == "duty":
self.send_core()
if self.use_old_topics:
if self.use_bio:
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.arm_publisher.publish(ARM_STOP_MSG)
# New topics shouldn't need to constantly send zeroes imo
else:
if self.use_bio:
self.send_bio()
else:
self.send_arm()
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
# Ditto
def send_core(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
input = CoreControl() input = CoreControl()
input.max_speed = 90 input.max_speed = 90
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
# Right wheels # Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2)) input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -338,10 +194,19 @@ class Headless(Node):
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.core_publisher.publish(input) self.core_publisher.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
else: # New topics elif self.ctrl_mode == "core" and CORE_MODE == "twist":
input = Twist() input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
button_a = self.gamepad.get_button(0)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
# Forward/back and Turn # Forward/back and Turn
input.linear.x = -1.0 * left_stick_y input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign( input.angular.z = -1.0 * copysign(
@@ -350,6 +215,8 @@ class Headless(Node):
# Publish # Publish
self.core_twist_pub_.publish(input) self.core_twist_pub_.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info( self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}" f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
) )
@@ -374,300 +241,91 @@ class Headless(Node):
state_msg = CoreCtrlState() state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode) state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty) state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg) self.core_state_pub_.publish(state_msg)
self.get_logger().info( self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}" f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
) )
def send_arm(self): # ARM and BIO
# Collect controller state if self.ctrl_mode == "arm":
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# OLD MANUAL
# ==========
if not self.use_arm_ik and self.use_old_topics:
arm_input = ArmManual() arm_input = ArmManual()
# OLD ARM MANUAL CONTROL SCHEME # Collect controller state
if not self.use_new_arm_manual_scheme: left_stick_x = deadzone(self.gamepad.get_axis(0))
# EF Grippers left_stick_y = deadzone(self.gamepad.get_axis(1))
if left_trigger > 0 and right_trigger > 0: left_trigger = deadzone(self.gamepad.get_axis(2))
arm_input.gripper = 0 right_stick_x = deadzone(self.gamepad.get_axis(3))
elif left_trigger > 0: right_stick_y = deadzone(self.gamepad.get_axis(4))
arm_input.gripper = -1 right_trigger = deadzone(self.gamepad.get_axis(5))
elif right_trigger > 0: right_bumper = self.gamepad.get_button(5)
arm_input.gripper = 1 dpad_input = self.gamepad.get_hat(0)
# Axis 0 # EF Grippers
if dpad_input[0] == 1:
arm_input.axis0 = 1
elif dpad_input[0] == -1:
arm_input.axis0 = -1
if right_bumper: # Control end effector
# Effector yaw
if left_stick_x > 0:
arm_input.effector_yaw = 1
elif left_stick_x < 0:
arm_input.effector_yaw = -1
# Effector roll
if right_stick_x > 0:
arm_input.effector_roll = 1
elif right_stick_x < 0:
arm_input.effector_roll = -1
else: # Control arm axis
# Axis 1
if abs(left_stick_x) > 0.15:
arm_input.axis1 = round(left_stick_x)
# Axis 2
if abs(left_stick_y) > 0.15:
arm_input.axis2 = -1 * round(left_stick_y)
# Axis 3
if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y)
# NEW ARM MANUAL CONTROL SCHEME
if self.use_new_arm_manual_scheme:
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
# Right stick: EF yaw and axis 3
arm_input.effector_yaw = stick_to_arm_direction(right_stick_x)
arm_input.axis3 = -1 * stick_to_arm_direction(right_stick_y)
# Left stick: axis 1 and 2
arm_input.axis1 = stick_to_arm_direction(left_stick_x)
arm_input.axis2 = -1 * stick_to_arm_direction(left_stick_y)
# D-pad: axis 0 and _
arm_input.axis0 = int(dpad_input[0])
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0
elif left_trigger > 0:
arm_input.gripper = -1
elif right_trigger > 0:
arm_input.gripper = 1
# Bumpers: EF roll
if left_bumper > 0 and right_bumper > 0:
arm_input.effector_roll = 0
elif left_bumper > 0:
arm_input.effector_roll = -1
elif right_bumper > 0:
arm_input.effector_roll = 1
# A: brake
if button_a:
arm_input.brake = True
# Y: linear actuator
if button_y and not button_b:
arm_input.linear_actuator = 1
elif button_b and not button_y:
arm_input.linear_actuator = -1
else:
arm_input.linear_actuator = 0
self.arm_publisher.publish(arm_input)
# NEW MANUAL
# ==========
elif not self.use_arm_ik and not self.use_old_topics:
arm_input = JointJog()
arm_input.header.frame_id = "base_link"
arm_input.header.stamp = self.get_clock().now().to_msg()
arm_input.joint_names = self.all_joint_names
arm_input.velocities = [0.0] * len(self.all_joint_names)
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
stick_to_arm_direction(right_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
-1 * stick_to_arm_direction(right_stick_y)
)
# Left stick: axis 1 and 2
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = float(
stick_to_arm_direction(left_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
-1 * stick_to_arm_direction(left_stick_y)
)
# D-pad: axis 0 and _
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = float(
dpad_input[0]
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
arm_input.velocities[ arm_input.gripper = 0
self.all_joint_names.index("ef_gripper_left_joint")
] = 0.0
elif left_trigger > 0: elif left_trigger > 0:
arm_input.velocities[ arm_input.gripper = -1
self.all_joint_names.index("ef_gripper_left_joint")
] = -1.0
elif right_trigger > 0: elif right_trigger > 0:
arm_input.velocities[ arm_input.gripper = 1
self.all_joint_names.index("ef_gripper_left_joint")
] = 1.0
# Bumpers: EF roll # Axis 0
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = ( if dpad_input[0] == 1:
right_bumper - left_bumper arm_input.axis0 = 1
) elif dpad_input[0] == -1:
arm_input.axis0 = -1
# A: brake if right_bumper: # Control end effector
# TODO: Brake mode
# Y: linear actuator # Effector yaw
# TODO: linear actuator if left_stick_x > 0:
arm_input.effector_yaw = 1
elif left_stick_x < 0:
arm_input.effector_yaw = -1
self.arm_manual_pub_.publish(arm_input) # Effector roll
if right_stick_x > 0:
arm_input.effector_roll = 1
elif right_stick_x < 0:
arm_input.effector_roll = -1
# IK (ONLY NEW) else: # Control arm axis
# =============
elif self.use_arm_ik: # Axis 1
arm_twist = TwistStamped() if abs(left_stick_x) > 0.15:
arm_twist.header.frame_id = "base_link" arm_input.axis1 = round(left_stick_x)
arm_twist.header.stamp = self.get_clock().now().to_msg()
arm_jointjog = JointJog()
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Right stick: linear y and linear x # Axis 2
# Left stick: angular z and linear z if abs(left_stick_y) > 0.15:
# D-pad: angular y and _ arm_input.axis2 = -1 * round(left_stick_y)
# Triggers: EF grippers
# Bumpers: angular x
# A: brake
# B: IK mode
# X: manual mode
# Y: linear actuator
# Right stick: linear y and linear x # Axis 3
arm_twist.twist.linear.y = float(right_stick_x) if abs(right_stick_y) > 0.15:
arm_twist.twist.linear.x = float(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
# Left stick: angular z and linear z # BIO
arm_twist.twist.angular.z = float(-1 * left_stick_x)
arm_twist.twist.linear.z = float(-1 * left_stick_y)
# D-pad: angular y and _
arm_twist.twist.angular.y = (
float(0)
if dpad_input[0] == 0
else float(-1 * copysign(0.75, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append("ef_gripper_left_joint") # type: ignore
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
if left_bumper > 0 and right_bumper > 0:
arm_twist.twist.angular.x = float(0)
elif left_bumper > 0:
arm_twist.twist.angular.x = float(1)
elif right_bumper > 0:
arm_twist.twist.angular.x = float(-1)
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def send_bio(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100), drill_arm=int(round(right_stick_y) * -100),
) )
# Drill motor (FAERIE) # Drill motor (FAERIE)
if left_trigger > 0 or right_trigger > 0: if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int( bio_input.drill = int(
30 * (right_trigger - left_trigger) 30 * (right_trigger - left_trigger)
) # Max duty cycle 30% ) # Max duty cycle 30%
self.bio_publisher.publish(bio_input) self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(arm_input)
else: # self.bio_publisher.publish(bio_input)
pass # TODO: implement new bio control topics
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=0.05) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""
if abs(value) < threshold: if abs(value) < threshold:
return 0 return 0
return value return value
def stick_to_arm_direction(value: float, threshold=ARM_DEADZONE) -> int:
"""Apply a larger deadzone to a stick input and make digital/binary instead of analog"""
if abs(value) < threshold:
return 0
return int(copysign(1, value))
def is_user_in_group(group_name: str) -> bool: def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/ # Copied from https://zetcode.com/python/os-getgrouplist/
try: try:
@@ -686,26 +344,20 @@ def is_user_in_group(group_name: str) -> bool:
return False return False
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None): def main(args=None):
try: try:
rclpy.init(args=args) rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...") print("Caught shutdown signal. Exiting...")
finally: finally:
rclpy.try_shutdown() rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -26,7 +26,7 @@ class LatencyTester : public rclcpp::Node
{ {
public: public:
LatencyTester() LatencyTester()
: Node("latency_tester"), count_(0) : Node("latency_tester"), count_(0), target_mcu_("core")
{ {
publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10); publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10);
timer_ = this->create_wall_timer( timer_ = this->create_wall_timer(
@@ -35,8 +35,6 @@ public:
"/anchor/debug", "/anchor/debug",
10, 10,
std::bind(&LatencyTester::response_callback, this, std::placeholders::_1)); std::bind(&LatencyTester::response_callback, this, std::placeholders::_1));
target_mcu_ = this->declare_parameter<std::string>("target_mcu", "core");
} }
private: private:

View File

@@ -0,0 +1,80 @@
cmake_minimum_required(VERSION 3.22)
project(servo_arm_twist_pkg)
# C++ Libraries #################################################
# Core C++ library for calculations and collision checking.
# Provides interface used by the component node.
set(SERVO_LIB_NAME servo_arm_twist_lib)
# Pose Tracking
set(POSE_TRACKING pose_tracking)
# Component Nodes (Shared libraries) ############################
set(SERVO_COMPONENT_NODE servo_node)
set(SERVO_CONTROLLER_INPUT servo_controller_input)
# Executable Nodes ##############################################
set(SERVO_NODE_MAIN_NAME servo_node_main)
set(POSE_TRACKING_DEMO_NAME servo_pose_tracking_demo)
set(FAKE_SERVO_CMDS_NAME fake_command_publisher)
#################################################################
# Common cmake code applied to all moveit packages
find_package(moveit_common REQUIRED)
moveit_package()
set(THIS_PACKAGE_INCLUDE_DEPENDS
control_msgs
control_toolbox
geometry_msgs
moveit_core
moveit_msgs
moveit_ros_planning
pluginlib
rclcpp
rclcpp_components
sensor_msgs
std_msgs
std_srvs
tf2_eigen
trajectory_msgs
)
find_package(ament_cmake REQUIRED)
find_package(eigen3_cmake_module REQUIRED)
find_package(Eigen3 REQUIRED)
foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()
#####################
## Component Nodes ##
#####################
# Add executable for using a controller
add_library(${SERVO_CONTROLLER_INPUT} SHARED src/joystick_twist.cpp)
ament_target_dependencies(${SERVO_CONTROLLER_INPUT} ${THIS_PACKAGE_INCLUDE_DEPENDS})
rclcpp_components_register_nodes(${SERVO_CONTROLLER_INPUT} "servo_arm_twist_pkg::JoyToServoPub")
#############
## Install ##
#############
# Install Libraries
install(
TARGETS
${SERVO_CONTROLLER_INPUT}
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
)
# Install Binaries
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(${THIS_PACKAGE_INCLUDE_DEPENDS})
ament_package()

View File

@@ -0,0 +1,3 @@
# Moveit Servo
See the [Realtime Arm Servoing Tutorial](https://moveit.picknik.ai/main/doc/realtime_servo/realtime_servo_tutorial.html) for installation instructions, quick-start guide, an overview about `moveit_servo`, and to learn how to set it up on your robot.

View File

@@ -0,0 +1,58 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>servo_arm_twist_pkg</name>
<version>2.5.9</version>
<description>Provides real-time manipulator Cartesian and joint servoing.</description>
<maintainer email="blakeanderson@utexas.edu">Blake Anderson</maintainer>
<maintainer email="andyz@utexas.edu">Andy Zelenak</maintainer>
<maintainer email="tyler@picknik.ai">Tyler Weaver</maintainer>
<maintainer email="henningkayser@picknik.ai">Henning Kayser</maintainer>
<license>BSD 3-Clause</license>
<url type="website">https://ros-planning.github.io/moveit_tutorials</url>
<author>Brian O'Neil</author>
<author email="andyz@utexas.edu">Andy Zelenak</author>
<author>Blake Anderson</author>
<author email="alex@machinekoder.com">Alexander Rössler</author>
<author email="tyler@picknik.ai">Tyler Weaver</author>
<author email="adam.pettinger@utexas.edu">Adam Pettinger</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>moveit_common</depend>
<depend>control_msgs</depend>
<depend>control_toolbox</depend>
<depend>geometry_msgs</depend>
<depend>moveit_msgs</depend>
<depend>moveit_core</depend>
<depend>moveit_ros_planning_interface</depend>
<depend>pluginlib</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>tf2_eigen</depend>
<depend>trajectory_msgs</depend>
<exec_depend>gripper_controllers</exec_depend>
<exec_depend>joint_state_broadcaster</exec_depend>
<exec_depend>joint_trajectory_controller</exec_depend>
<exec_depend>joy</exec_depend>
<exec_depend>robot_state_publisher</exec_depend>
<exec_depend>tf2_ros</exec_depend>
<exec_depend>moveit_configs_utils</exec_depend>
<exec_depend>launch_param_builder</exec_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>controller_manager</test_depend>
<test_depend>ros_testing</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -0,0 +1,271 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2020, PickNik Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of PickNik Inc. nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
/* Title : joystick_servo_example.cpp
* Project : servo_arm_twist_pkg
* Created : 08/07/2020
* Author : Adam Pettinger
*/
#include <sensor_msgs/msg/joy.hpp>
#include <geometry_msgs/msg/twist_stamped.hpp>
#include <control_msgs/msg/joint_jog.hpp>
#include <std_srvs/srv/trigger.hpp>
#include <moveit_msgs/msg/planning_scene.hpp>
#include <rclcpp/client.hpp>
#include <rclcpp/experimental/buffers/intra_process_buffer.hpp>
#include <rclcpp/node.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/qos.hpp>
#include <rclcpp/qos_event.hpp>
#include <rclcpp/subscription.hpp>
#include <rclcpp/time.hpp>
#include <rclcpp/utilities.hpp>
#include <thread>
// We'll just set up parameters here
const std::string JOY_TOPIC = "/joy";
const std::string TWIST_TOPIC = "/servo_node/delta_twist_cmds";
const std::string JOINT_TOPIC = "/servo_node/delta_joint_cmds";
const std::string EEF_FRAME_ID = "End_Effector";
const std::string BASE_FRAME_ID = "base_link";
// Enums for button names -> axis/button array index
// For XBOX 1 controller
enum Axis
{
LEFT_STICK_X = 0,
LEFT_STICK_Y = 1,
LEFT_TRIGGER = 2,
RIGHT_STICK_X = 3,
RIGHT_STICK_Y = 4,
RIGHT_TRIGGER = 5,
D_PAD_X = 6,
D_PAD_Y = 7
};
enum Button
{
A = 0,
B = 1,
X = 2,
Y = 3,
LEFT_BUMPER = 4,
RIGHT_BUMPER = 5,
CHANGE_VIEW = 6,
MENU = 7,
HOME = 8,
LEFT_STICK_CLICK = 9,
RIGHT_STICK_CLICK = 10
};
// Some axes have offsets (e.g. the default trigger position is 1.0 not 0)
// This will map the default values for the axes
std::map<Axis, double> AXIS_DEFAULTS = { { LEFT_TRIGGER, 1.0 }, { RIGHT_TRIGGER, 1.0 } };
std::map<Button, double> BUTTON_DEFAULTS;
// To change controls or setup a new controller, all you should to do is change the above enums and the follow 2
// functions
/** \brief // This converts a joystick axes and buttons array to a TwistStamped or JointJog message
* @param axes The vector of continuous controller joystick axes
* @param buttons The vector of discrete controller button values
* @param twist A TwistStamped message to update in prep for publishing
* @param joint A JointJog message to update in prep for publishing
* @return return true if you want to publish a Twist, false if you want to publish a JointJog
*/
bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& buttons,
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist,
std::unique_ptr<control_msgs::msg::JointJog>& joint)
{
// // Give joint jogging priority because it is only buttons
// // If any joint jog command is requested, we are only publishing joint commands
// if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y])
// {
// // Map the D_PAD to the proximal joints
// joint->joint_names.push_back("panda_joint1");
// joint->velocities.push_back(axes[D_PAD_X]);
// joint->joint_names.push_back("panda_joint2");
// joint->velocities.push_back(axes[D_PAD_Y]);
// // Map the diamond to the distal joints
// joint->joint_names.push_back("panda_joint7");
// joint->velocities.push_back(buttons[B] - buttons[X]);
// joint->joint_names.push_back("panda_joint6");
// joint->velocities.push_back(buttons[Y] - buttons[A]);
// return false;
// }
// The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y];
twist->twist.linear.y = axes[RIGHT_STICK_X];
double lin_x_right = -0.5 * (axes[RIGHT_TRIGGER] - AXIS_DEFAULTS.at(RIGHT_TRIGGER));
double lin_x_left = 0.5 * (axes[LEFT_TRIGGER] - AXIS_DEFAULTS.at(LEFT_TRIGGER));
twist->twist.linear.x = lin_x_right + lin_x_left;
twist->twist.angular.y = axes[LEFT_STICK_Y];
twist->twist.angular.x = axes[LEFT_STICK_X];
double roll_positive = buttons[RIGHT_BUMPER];
double roll_negative = -1 * (buttons[LEFT_BUMPER]);
twist->twist.angular.z = roll_positive + roll_negative;
return true;
}
/** \brief // This should update the frame_to_publish_ as needed for changing command frame via controller
* @param frame_name Set the command frame to this
* @param buttons The vector of discrete controller button values
*/
void updateCmdFrame(std::string& frame_name, const std::vector<int>& buttons)
{
if (buttons[CHANGE_VIEW] && frame_name == EEF_FRAME_ID)
frame_name = BASE_FRAME_ID;
else if (buttons[MENU] && frame_name == BASE_FRAME_ID)
frame_name = EEF_FRAME_ID;
}
namespace servo_arm_twist_pkg
{
class JoyToServoPub : public rclcpp::Node
{
public:
JoyToServoPub(const rclcpp::NodeOptions& options)
: Node("joy_to_twist_publisher", options), frame_to_publish_(BASE_FRAME_ID)
{
// Setup pub/sub
joy_sub_ = this->create_subscription<sensor_msgs::msg::Joy>(
JOY_TOPIC, rclcpp::SystemDefaultsQoS(),
[this](const sensor_msgs::msg::Joy::ConstSharedPtr& msg) { return joyCB(msg); });
twist_pub_ = this->create_publisher<geometry_msgs::msg::TwistStamped>(TWIST_TOPIC, rclcpp::SystemDefaultsQoS());
joint_pub_ = this->create_publisher<control_msgs::msg::JointJog>(JOINT_TOPIC, rclcpp::SystemDefaultsQoS());
// collision_pub_ =
// this->create_publisher<moveit_msgs::msg::PlanningScene>("/planning_scene", rclcpp::SystemDefaultsQoS());
// Create a service client to start the ServoNode
servo_start_client_ = this->create_client<std_srvs::srv::Trigger>("/servo_node/start_servo");
servo_start_client_->wait_for_service(std::chrono::seconds(1));
servo_start_client_->async_send_request(std::make_shared<std_srvs::srv::Trigger::Request>());
// // Load the collision scene asynchronously
// collision_pub_thread_ = std::thread([this]() {
// rclcpp::sleep_for(std::chrono::seconds(3));
// // Create collision object, in the way of servoing
// moveit_msgs::msg::CollisionObject collision_object;
// collision_object.header.frame_id = "panda_link0";
// collision_object.id = "box";
// shape_msgs::msg::SolidPrimitive table_1;
// table_1.type = table_1.BOX;
// table_1.dimensions = { 0.4, 0.6, 0.03 };
// geometry_msgs::msg::Pose table_1_pose;
// table_1_pose.position.x = 0.6;
// table_1_pose.position.y = 0.0;
// table_1_pose.position.z = 0.4;
// shape_msgs::msg::SolidPrimitive table_2;
// table_2.type = table_2.BOX;
// table_2.dimensions = { 0.6, 0.4, 0.03 };
// geometry_msgs::msg::Pose table_2_pose;
// table_2_pose.position.x = 0.0;
// table_2_pose.position.y = 0.5;
// table_2_pose.position.z = 0.25;
// collision_object.primitives.push_back(table_1);
// collision_object.primitive_poses.push_back(table_1_pose);
// collision_object.primitives.push_back(table_2);
// collision_object.primitive_poses.push_back(table_2_pose);
// collision_object.operation = collision_object.ADD;
// moveit_msgs::msg::PlanningSceneWorld psw;
// psw.collision_objects.push_back(collision_object);
// auto ps = std::make_unique<moveit_msgs::msg::PlanningScene>();
// ps->world = psw;
// ps->is_diff = true;
// collision_pub_->publish(std::move(ps));
// });
}
// ~JoyToServoPub() override
// {
// if (collision_pub_thread_.joinable())
// collision_pub_thread_.join();
// }
void joyCB(const sensor_msgs::msg::Joy::ConstSharedPtr& msg)
{
// Create the messages we might publish
auto twist_msg = std::make_unique<geometry_msgs::msg::TwistStamped>();
auto joint_msg = std::make_unique<control_msgs::msg::JointJog>();
// This call updates the frame for twist commands
updateCmdFrame(frame_to_publish_, msg->buttons);
// Convert the joystick message to Twist or JointJog and publish
if (convertJoyToCmd(msg->axes, msg->buttons, twist_msg, joint_msg))
{
// publish the TwistStamped
twist_msg->header.frame_id = frame_to_publish_;
twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg));
}
// else
// {
// // publish the JointJog
// joint_msg->header.stamp = this->now();
// joint_msg->header.frame_id = "panda_link3";
// joint_pub_->publish(std::move(joint_msg));
// }
}
private:
rclcpp::Subscription<sensor_msgs::msg::Joy>::SharedPtr joy_sub_;
rclcpp::Publisher<geometry_msgs::msg::TwistStamped>::SharedPtr twist_pub_;
rclcpp::Publisher<control_msgs::msg::JointJog>::SharedPtr joint_pub_;
rclcpp::Publisher<moveit_msgs::msg::PlanningScene>::SharedPtr collision_pub_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr servo_start_client_;
std::string frame_to_publish_;
// std::thread collision_pub_thread_;
}; // class JoyToServoPub
} // namespace servo_arm_twist_pkg
// Register the component with class_loader
#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(servo_arm_twist_pkg::JoyToServoPub)