4 Commits

Author SHA1 Message Date
91fa23cd32 Fixed README 2026-02-10 18:54:40 -06:00
3e1b1683af Updated README 2026-02-10 18:50:53 -06:00
e3ad666fbc added --symlink-install 2026-02-10 18:42:53 -06:00
7da58dda3c added Docker compose 2026-02-10 17:58:06 -06:00
24 changed files with 1666 additions and 1577 deletions

4
.dockerignore Normal file
View File

@@ -0,0 +1,4 @@
.git
*.log
build/
install/

1
.env Normal file
View File

@@ -0,0 +1 @@
DOCKER_IMAGE=main-jazzy-tutorial-source

View File

@@ -11,14 +11,12 @@ You will use these packages to launch all rover-side ROS2 nodes.
- [Software Prerequisites](#software-prerequisites) - [Software Prerequisites](#software-prerequisites)
- [Nix](#nix) - [Nix](#nix)
- [ROS2 Humble + rosdep](#ros2-humble--rosdep) - [ROS2 Humble + rosdep](#ros2-humble--rosdep)
- [Docker](#docker)
- [Running](#running) - [Running](#running)
- [Testing Serial](#testing-serial) - [Testing Serial](#testing-serial)
- [Connecting the GuliKit Controller](#connecting-the-gulikit-controller) - [Connecting the GuliKit Controller](#connecting-the-gulikit-controller)
- [Common Problems/Toubleshooting](#common-problemstroubleshooting) - [Common Problems/Toubleshooting](#common-problemstroubleshooting)
- [Packages](#packages) - [Packages](#packages)
- [Graphs](#graphs)
- [Full System](#full-system)
- [Individual Nodes](#individual-nodes)
- [Maintainers](#maintainers) - [Maintainers](#maintainers)
## Software Prerequisites ## Software Prerequisites
@@ -50,6 +48,17 @@ $ cd path/to/rover-ros2
$ rosdep install --from-paths src -y --ignore-src $ rosdep install --from-paths src -y --ignore-src
``` ```
### Docker
Using the docker compose file automatically builds the workspace and allows you to choose between running on the CPU or GPU for applications like Rviz2 and Gazebo:
```bash
# Run on CPU
$ docker compose run --rm --name rover-ros2-container cpu
# Run on GPU (NVidia only)
$ docker compose run --rm --name rover-ros2-container gpu
```
## Running ## Running
```bash ```bash
@@ -143,40 +152,6 @@ A: To find a microcontroller to talk to, Anchor sends a ping to every Serial por
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`). - [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK. - [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Graphs
### Full System
> **Anchor stand-alone** (`ros2 launch anchor_pkg rover.launch.py`)
>
> ![rqt_graph of Anchor by itself, ran with command: ros2 launch anchor_pkg rover.launch.py](./docs-resources/graph-anchor-standalone.png)
> **Anchor with [basestation-classic](https://github.com/SHC-ASTRA/basestation-classic)**
>
> ![rqt_graph of Anchor ran with the same command as above, talking to basestation-classic](./docs-resources/graph-anchor-w-basestation-classic.png)
> **Anchor with Headless** (`ros2 run headless_pkg headless_full`)
>
> ![rqt_graph of Anchor ran with Headless](./docs-resources/graph-anchor-w-headless.png)
### Individual Nodes
> **Anchor** (`ros2 run anchor_pkg anchor`)
>
> ![rqt_graph of Anchor node running by itself](./docs-resources/graph-anchor-anchor-standalone.png)
> **Core** (`ros2 run core_pkg core --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Core node running by itself](./docs-resources/graph-anchor-core-standalone.png)
> **Arm** (`ros2 run arm_pkg arm --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Arm node running by itself](./docs-resources/graph-anchor-arm-standalone.png)
> **Bio** (`ros2 run bio_pkg bio --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Bio node running by itself](./docs-resources/graph-anchor-bio-standalone.png)
## Maintainers ## Maintainers
| Name | Email | Discord | | Name | Email | Discord |

38
docker-compose.yml Normal file
View File

@@ -0,0 +1,38 @@
services:
cpu:
image: moveit/moveit2:${DOCKER_IMAGE}
container_name: moveit2_container
privileged: true
network_mode: host
command: ["bash", "-c", "colcon build --symlink-install && source /ros2_ws/install/setup.bash && exec bash"]
volumes:
- ./:/ros2_ws
- /tmp/.X11-unix:/tmp/.X11-unix
- $XAUTHORITY:/root/.Xauthority
working_dir: /ros2_ws
environment:
QT_X11_NO_MITSHM: 1
DISPLAY: $DISPLAY
gpu:
image: moveit/moveit2:${DOCKER_IMAGE}
container_name: moveit2_container
privileged: true
network_mode: host
command: ["bash", "-c", "colcon build --symlink-install && source /ros2_ws/install/setup.bash && exec bash"]
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
- ./:/ros2_ws
- /tmp/.X11-unix:/tmp/.X11-unix
- $XAUTHORITY:/root/.Xauthority
working_dir: /ros2_ws
environment:
QT_X11_NO_MITSHM: 1
DISPLAY: $DISPLAY
NVIDIA_VISIBLE_DEVICES: all
NVIDIA_DRIVER_CAPABILITIES: all

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 543 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 439 KiB

View File

@@ -1,21 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import ( from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
DeclareLaunchArgument,
OpaqueFunction,
Shutdown,
IncludeLaunchDescription,
)
from launch.substitutions import ( from launch.substitutions import (
LaunchConfiguration, LaunchConfiguration,
ThisLaunchFileDir, ThisLaunchFileDir,
PathJoinSubstitution, PathJoinSubstitution,
) )
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.conditions import IfCondition
from launch_ros.actions import Node from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
@@ -48,10 +40,7 @@ def launch_setup(context, *args, **kwargs):
executable="core", # change as needed executable="core", # change as needed
name="core", name="core",
output="both", output="both",
parameters=[ parameters=[{"launch_mode": mode}],
{"launch_mode": mode},
{"use_ros2_control": LaunchConfiguration("use_ros2_control", default=False)},
],
on_exit=Shutdown(), on_exit=Shutdown(),
) )
) )
@@ -61,7 +50,6 @@ def launch_setup(context, *args, **kwargs):
executable="ptz", # change as needed executable="ptz", # change as needed
name="ptz", name="ptz",
output="both", output="both",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")),
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )
@@ -145,46 +133,4 @@ def generate_launch_description():
description="Launch mode: arm, core, bio, anchor, or ptz", description="Launch mode: arm, core, bio, anchor, or ptz",
) )
ros2_control_arg = DeclareLaunchArgument( return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)])
"use_ros2_control",
default_value="false",
description="Whether to use DiffDriveController for driving instead of direct Twist",
)
rsp = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
controllers = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
return LaunchDescription(
[
declare_arg,
ros2_control_arg,
rsp,
controllers,
OpaqueFunction(function=launch_setup),
]
)

View File

@@ -0,0 +1,279 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
from std_msgs.msg import String
from astra_msgs.msg import ControllerState
from astra_msgs.msg import ArmManual
from astra_msgs.msg import ArmIK
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("arm_headless")
# Depricated, kept temporarily for reference
# self.create_timer(0.20, self.send_controls)#read and send controls
self.create_timer(0.1, self.send_manual)
# Create a publisher to publish any output the pico sends
# Depricated, kept temporarily for reference
# self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
self.manual_pub = self.create_publisher(ArmManual, "/arm/control/manual", 10)
# Create a subscriber to listen to any commands sent for the pico
# Depricated, kept temporarily for reference
# self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
# self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
# Check the pico for updates
# self.read_feedback()
if (
pygame.joystick.get_count() == 0
): # if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
# self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
# self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() # re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_manual(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = ArmManual()
# Triggers for gripper control
if self.gamepad.get_axis(2) > 0: # left trigger
input.gripper = -1
elif self.gamepad.get_axis(5) > 0: # right trigger
input.gripper = 1
# Toggle Laser
if self.gamepad.get_button(7): # Start
self.laser_status = 1
elif self.gamepad.get_button(6): # Back
self.laser_status = 0
input.laser = self.laser_status
if self.gamepad.get_button(5): # right bumper, control effector
# Left stick X-axis for effector yaw
if self.gamepad.get_axis(0) > 0:
input.effector_yaw = 1
elif self.gamepad.get_axis(0) < 0:
input.effector_yaw = -1
# Right stick X-axis for effector roll
if self.gamepad.get_axis(3) > 0:
input.effector_roll = 1
elif self.gamepad.get_axis(3) < 0:
input.effector_roll = -1
else: # Control arm axis
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
if self.gamepad.get_axis(0) > 0.15 or self.gamepad.get_axis(0) < -0.15:
input.axis1 = round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > 0.15 or self.gamepad.get_axis(1) < -0.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
if self.gamepad.get_axis(4) > 0.15 or self.gamepad.get_axis(4) < -0.15:
input.axis3 = -1 * round(self.gamepad.get_axis(4))
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
# Button Mappings
# axis2 -> LT
# axis5 -> RT
# Buttons0 -> A
# Buttons1 -> B
# Buttons2 -> X
# Buttons3 -> Y
# Buttons4 -> LB
# Buttons5 -> RB
# Buttons6 -> Back
# Buttons7 -> Start
input.linear_actuator = 0
if pygame.joystick.get_count() != 0:
self.get_logger().info(
f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n"
)
self.manual_pub.publish(input)
else:
pass
pass
# Depricated, kept temporarily for reference
# def send_controls(self):
# for event in pygame.event.get():
# if event.type == pygame.QUIT:
# pygame.quit()
# exit()
# input = ControllerState()
# input.lt = self.gamepad.get_axis(2)#left trigger
# input.rt = self.gamepad.get_axis(5)#right trigger
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
# if(self.gamepad.get_button(4)):#left bumper
# input.lb = True
# else:
# input.lb = False
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
# if(self.gamepad.get_button(5)):#right bumper
# input.rb = True
# else:
# input.rb = False
# #input.plus = self.gamepad.get_button(6)#plus button
# if(self.gamepad.get_button(7)):#plus button
# input.plus = True
# else:
# input.plus = False
# #input.minus = self.gamepad.get_button(4)#minus button
# if(self.gamepad.get_button(6)):#minus button
# input.minus = True
# else:
# input.minus = False
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
# #input.a = self.gamepad.get_button(1)#A button
# if(self.gamepad.get_button(0)):#A button
# input.a = True
# else:
# input.a = False
# #input.b = self.gamepad.get_button(0)#B button
# if(self.gamepad.get_button(1)):#B button
# input.b = True
# else:
# input.b = False
# #input.x = self.gamepad.get_button(3)#X button
# if(self.gamepad.get_button(2)):#X button
# input.x = True
# else:
# input.x = False
# #input.y = self.gamepad.get_button(2)#Y button
# if(self.gamepad.get_button(3)):#Y button
# input.y = True
# else:
# input.y = False
# dpad_input = self.gamepad.get_hat(0)#D-pad input
# #not using up/down on DPad
# input.d_up = False
# input.d_down = False
# if(dpad_input[0] == 1):#D-pad right
# input.d_right = True
# else:
# input.d_right = False
# if(dpad_input[0] == -1):#D-pad left
# input.d_left = True
# else:
# input.d_left = False
# if pygame.joystick.get_count() != 0:
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
# self.publisher.publish(input)
# else:
# pass
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
# tb_bs = BaseStation()
# node.run()
if __name__ == "__main__":
main()

View File

@@ -1,221 +1,181 @@
import sys
import threading
import signal
import math
from warnings import deprecated
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException import serial
from rclpy import qos import sys
import threading
from std_msgs.msg import String, Header import glob
import time
import atexit
import signal
from std_msgs.msg import String
from astra_msgs.msg import ArmManual
from astra_msgs.msg import SocketFeedback
from astra_msgs.msg import DigitFeedback
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog import math
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, # depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), # deadline=1000,
# lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=5000
) # )
serial_pub = None
thread = None thread = None
class ArmNode(Node): class SerialRelay(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
def __init__(self): def __init__(self):
# Initialize node
super().__init__("arm_node") super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output # Get launch mode parameter
self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
################################################## # Create publishers
# Parameters self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
)
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
self.declare_parameter("use_old_topics", True) # Create subscribers
self.use_old_topics = ( self.man_sub = self.create_subscription(
self.get_parameter("use_old_topics").get_parameter_value().bool_value ArmManual, "/arm/control/manual", self.send_manual, 2
) )
################################################## # New messages
# Old topics self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
self.joint_state = JointState()
self.joint_state.name = [
"Axis_0_Joint",
"Axis_1_Joint",
"Axis_2_Joint",
"Axis_3_Joint",
"Wrist_Differential_Joint",
"Wrist-EF_Roll_Joint",
"Gripper_Slider_Left",
]
self.joint_state.position = [0.0] * len(
self.joint_state.name
) # Initialize with zeros
if self.use_old_topics: self.joint_command_sub = self.create_subscription(
# Anchor topics JointState, "/joint_commands", self.joint_command_callback, 10
)
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10 String, "/anchor/arm/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Create publishers self.arm_feedback = SocketFeedback()
self.socket_pub = self.create_publisher( self.digit_feedback = DigitFeedback()
SocketFeedback, "/arm/feedback/socket", 10
)
self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(
DigitFeedback, "/arm/feedback/digit", 10
)
self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
self.man_sub = self.create_subscription( if self.launch_mode == "arm":
ArmManual, "/arm/control/manual", self.send_manual, 10 # Loop through all serial devices on the computer to check for the MCU
) self.port = None
ports = SerialRelay.list_serial_ports()
for _ in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
################################################### # if pong is in response, then we are talking with the MCU
# New topics if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
# Anchor topics if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
# from_vic self.ser = serial.Serial(self.port, 115200)
self.anchor_fromvic_sub_ = self.create_subscription( atexit.register(self.cleanup)
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Control def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
# Manual: /arm/manual/joint_jog is published by Basestation or Headless # if in arm mode, will need to read from the MCU
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/manual/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState,
"/joint_commands",
self.joint_command_callback,
qos_profile=control_qos,
)
# Feedback try:
while rclpy.ok():
if self.launch_mode == "arm":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.cleanup()
# Combined Socket and Digit feedback # Currently will just spit out all values over the /arm/feedback/debug topic as strings
self.arm_feedback_pub_ = self.create_publisher( def read_mcu(self):
ArmFeedback, try:
"/arm/feedback", output = str(self.ser.readline(), "utf8")
qos_profile=qos.qos_profile_sensor_data, if output:
) # self.get_logger().info(f"[MCU] {output}")
# IK arm pose: /joint_states is published from here to topic_based_control msg = String()
self.joint_state_pub_ = self.create_publisher( msg.data = output
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data self.debug_pub.publish(msg)
) except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
################################################### if self.ser.is_open:
# Saved state self.ser.close()
pass
# Combined Socket and Digit feedback except TypeError as e:
self.arm_feedback_new = ArmFeedback() self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
# IK Arm pose if self.ser.is_open:
self.saved_joint_state = JointState() self.ser.close()
self.saved_joint_state.name = self.all_joint_names pass
# ... initialize with zeros except Exception as e:
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name) print(f"Exception: {e}")
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name) print("Closing serial port.")
if self.ser.is_open:
def jointjog_callback(self, msg: JointJog): self.ser.close()
if len(msg.joint_names) != len(msg.velocities): pass
self.get_logger().debug("Ignoring malformed /arm/manual/joint_jog message.")
return
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)] # type: ignore
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.send_velocities(velocities, msg.header)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: # Embedded takes deg*10, ROS2 uses Radians
self.get_logger().debug("Ignoring malformed /joint_command message.") positions = [math.degrees(pos) * 10 for pos in msg.position]
return # command needs either position or velocity for all 7 joints # Axis 2 & 3 URDF direction is inverted
positions[2] = -positions[2]
positions[3] = -positions[3]
# Grab velocities from message # Set target angles for each arm axis for embedded IK PID to handle
velocities = [ command = f"can_relay_tovic,arm,32,{positions[0]},{positions[1]},{positions[2]},{positions[3]}\n"
( # Wrist yaw and roll
msg.velocity[msg.name.index(joint_name)] # type: ignore command += f"can_relay_tovic,digit,32,{positions[4]},{positions[5]}\n"
if joint_name in msg.name # Gripper IK does not have adequate hardware yet
else 0.0 self.send_cmd(command)
)
for joint_name in self.all_joint_names
]
self.send_velocities(velocities, msg.header)
def send_velocities(self, velocities: list[float], header: Header):
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=26, data=[velocities[6]], header=header)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
@@ -240,17 +200,24 @@ class ArmNode(Node):
return return
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
output = String(data=msg) if (
self.anchor_pub.publish(output) self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
# pass
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"): elif output.startswith("can_relay_fromvic,arm,54"):
# pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
@@ -268,133 +235,19 @@ class ArmNode(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians(
float(parts[4])
) # Wrist roll
self.joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return
def relay_fromvic(self, msg: VicCAN):
# Code for socket and digit are broken out for cleaner code
if msg.mcu_name == "arm":
self.process_fromvic_arm(msg)
elif msg.mcu_name == "digit":
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
assert msg.mcu_name == "arm"
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.temperature = float(msg.data[1]) / 10.0
motor.voltage = float(msg.data[2]) / 10.0
motor.current = float(msg.data[3]) / 10.0
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 54: # Board voltages
self.arm_feedback_new.socket_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.socket_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.socket_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.socket_voltage.v3 = float(msg.data[3]) / 100.0
self.arm_feedback_new.socket_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions
angles = [angle / 10.0 for angle in msg.data] # VicCAN sends deg*10
# Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians(angles[2]) # Axis 2
self.saved_joint_state.position[3] = math.radians(angles[3]) # Axis 3
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
case 58: # REV SPARK MAX position and velocity feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.position = float(msg.data[1])
motor.velocity = float(msg.data[2])
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 59: # Arm joint velocities
velocities = [vel / 100.0 for vel in msg.data] # VicCAN sends deg/s*100
self.saved_joint_state.velocity[0] = math.radians(
velocities[0]
) # Axis 0
self.saved_joint_state.velocity[1] = math.radians(
velocities[1]
) # Axis 1
self.saved_joint_state.velocity[2] = math.radians(
velocities[2]
) # Axis 2
self.saved_joint_state.velocity[3] = math.radians(
velocities[3]
) # Axis 3
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
assert msg.mcu_name == "digit"
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]
) # Wrist roll
self.saved_joint_state.position[5] = math.radians(
msg.data[1]
) # Wrist yaw
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
@deprecated("Uses an old message type. Will be removed at some point.")
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: str):
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas # split the msg.data by commas
@@ -410,10 +263,19 @@ class ArmNode(Node):
self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3] self.arm_feedback.axis3_angle = angles[3]
# Joint state publisher for URDF visualization
self.joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted)
self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.joint_state.header.stamp = self.get_clock().now().to_msg()
self.joint_state_pub.publish(self.joint_state)
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateBusVoltage(self, msg: str): def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
@@ -428,7 +290,6 @@ class ArmNode(Node):
else: else:
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
@deprecated("Uses an old message type. Will be removed at some point.")
def updateMotorFeedback(self, msg: str): def updateMotorFeedback(self, msg: str):
parts = str(msg.strip()).split(",") parts = str(msg.strip()).split(",")
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
@@ -452,28 +313,38 @@ class ArmNode(Node):
self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def exit_handler(signum, frame): def cleanup(self):
print("Caught SIGTERM. Exiting...") print("Cleaning up...")
rclpy.try_shutdown() try:
sys.exit(0) if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
# Catch termination signals and exit cleanly global serial_pub
signal.signal(signal.SIGTERM, exit_handler) serial_pub = SerialRelay()
serial_pub.run()
arm_node = ArmNode()
try:
rclpy.spin(arm_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -1,22 +1,27 @@
from setuptools import setup from setuptools import find_packages, setup
import os
from glob import glob
package_name = "arm_pkg" package_name = "arm_pkg"
setup( setup(
name=package_name, name=package_name,
version="1.0.0", version="1.0.0",
packages=[package_name], packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]), ("share/" + package_name, ["package.xml"]),
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer="David", maintainer="tristan",
maintainer_email="ds0196@uah.edu", maintainer_email="tristanmcginnis26@gmail.com",
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.", description="TODO: Package description",
license="AGPL-3.0-only", license="All Rights Reserved",
entry_points={ entry_points={
"console_scripts": ["arm = arm_pkg.arm_node:main"], "console_scripts": [
"arm = arm_pkg.arm_node:main",
"headless = arm_pkg.arm_headless:main",
],
}, },
) )

File diff suppressed because it is too large Load Diff

View File

@@ -6,28 +6,21 @@ from rclpy.duration import Duration
import signal import signal
import time import time
import atexit
import os import os
import sys import sys
import threading
import glob
import pwd import pwd
import grp import grp
from math import copysign from math import copysign
from std_srvs.srv import Trigger from std_msgs.msg import String
from std_msgs.msg import Header from geometry_msgs.msg import Twist
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState
import warnings
# Literally headless
warnings.filterwarnings(
"ignore",
message="Your system is avx2 capable but pygame was not built with support for it.",
)
import pygame import pygame
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
@@ -41,44 +34,26 @@ CORE_STOP_TWIST_MSG = Twist() # "
ARM_STOP_MSG = ArmManual() # " ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, # history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, # durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
ARM_DEADZONE = float(os.getenv("ARM_DEADZONE", "0.2"))
class Headless(Node): class Headless(Node):
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
def __init__(self): def __init__(self):
# Initialize pygame first # Initialize pygame first
pygame.init() pygame.init()
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless_node") super().__init__("headless")
##################################################
# Preamble
# Wait for anchor to start # Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug") pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
@@ -130,139 +105,37 @@ class Headless(Node):
break break
id += 1 id += 1
################################################## self.create_timer(0.15, self.send_controls)
# Parameters
self.declare_parameter("use_old_topics", True) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.use_old_topics = ( self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.get_parameter("use_old_topics").get_parameter_value().bool_value self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
) )
self.core_state_pub_ = self.create_publisher(
self.declare_parameter("use_cmd_vel", False) CoreCtrlState, "/core/control/state", qos_profile=control_qos
self.use_cmd_vel = (
self.get_parameter("use_cmd_vel").get_parameter_value().bool_value
) )
self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
self.declare_parameter("use_arm_ik", False)
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
)
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
self.get_parameter("use_new_arm_manual_scheme")
.get_parameter_value()
.bool_value
)
# Check parameter validity
if self.use_cmd_vel:
self.get_logger().info("Using cmd_vel for core control")
global CORE_MODE
CORE_MODE = "twist"
else:
self.get_logger().info("Using astra_msgs/CoreControl for core control")
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
if not self.use_new_arm_manual_scheme and not self.use_old_topics:
self.get_logger().warn(
f"New arm manual does not support old control scheme. Defaulting to new scheme."
)
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
if not self.use_old_topics:
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_cmd_vel_pub_ = self.create_publisher(
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
)
# TODO: add new bio topics
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.use_arm_ik:
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
)
timeout_counter = 0
while not self.servo_start_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Waiting for servo_node/start_servo service...")
timeout_counter += 1
if timeout_counter >= 10:
self.get_logger().error(
"Servo's start service not available. IK control will not work."
)
break
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
# Added so you can tell when it starts running after changing the constant logging to debug from info def run(self):
self.get_logger().info("Defaulting to Core mode. Ready.") # This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
def stop_all(self): try:
if self.use_old_topics: while rclpy.ok():
self.core_publisher.publish(CORE_STOP_MSG) self.send_controls()
self.arm_publisher.publish(ARM_STOP_MSG) time.sleep(0.1) # Small delay to avoid CPU hogging
self.bio_publisher.publish(BIO_STOP_MSG) except KeyboardInterrupt:
else: sys.exit(0)
if self.use_cmd_vel:
self.core_cmd_vel_pub_.publish(self.core_cmd_vel_stop_msg())
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik:
self.arm_ik_twist_publisher.publish(self.arm_ik_twist_stop_msg())
else:
self.arm_manual_pub_.publish(self.arm_manual_stop_msg())
# TODO: add bio here after implementing new topics
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
@@ -274,8 +147,10 @@ class Headless(Node):
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads: if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Stop the rover if controller disconnected # Send one last zero control message
self.stop_all() self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info("Final stop commands sent. Shutting down.") self.get_logger().info("Final stop commands sent. Shutting down.")
# Clean up # Clean up
pygame.quit() pygame.quit()
@@ -291,51 +166,20 @@ class Headless(Node):
new_ctrl_mode = "core" new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode: if new_ctrl_mode != self.ctrl_mode:
self.stop_all()
self.gamepad.rumble(0.6, 0.7, 75) self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
if self.ctrl_mode == "arm" and self.use_bio:
self.get_logger().warning("NOTE: Using bio instead of arm.")
# Actually send the controls # CORE
if self.ctrl_mode == "core": if self.ctrl_mode == "core" and CORE_MODE == "duty":
self.send_core()
if self.use_old_topics:
if self.use_bio:
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.arm_publisher.publish(ARM_STOP_MSG)
# New topics shouldn't need to constantly send zeroes imo
else:
if self.use_bio:
self.send_bio()
else:
self.send_arm()
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
# Ditto
def send_core(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
input = CoreControl() input = CoreControl()
input.max_speed = 90 input.max_speed = 90
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
# Right wheels # Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2)) input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -350,30 +194,31 @@ class Headless(Node):
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.core_publisher.publish(input) self.core_publisher.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
else: # New topics elif self.ctrl_mode == "core" and CORE_MODE == "twist":
twist = Twist() input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
button_a = self.gamepad.get_button(0)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
# Forward/back and Turn # Forward/back and Turn
twist.linear.x = -1.0 * left_stick_y input.linear.x = -1.0 * left_stick_y
twist.angular.z = -1.0 * copysign( input.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x right_stick_x**2, right_stick_x
) # Exponent for finer control (curve) ) # Exponent for finer control (curve)
# This kinda looks dumb being seperate from the following block, but this
# maintains the separation between modifying the control message and sending it
if self.use_cmd_vel:
twist.linear.x *= 1.5
twist.angular.z *= 0.5
# Publish # Publish
if self.use_cmd_vel: self.core_twist_pub_.publish(input)
header = Header(stamp=self.get_clock().now().to_msg()) self.arm_publisher.publish(ARM_STOP_MSG)
self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist)) # self.bio_publisher.publish(BIO_STOP_MSG)
else: self.get_logger().info(
self.core_twist_pub_.publish(twist) f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
self.get_logger().debug(
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
) )
# Brake mode # Brake mode
@@ -396,315 +241,91 @@ class Headless(Node):
state_msg = CoreCtrlState() state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode) state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty) state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg) self.core_state_pub_.publish(state_msg)
self.get_logger().info( self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}" f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
) )
def send_arm(self): # ARM and BIO
# Collect controller state if self.ctrl_mode == "arm":
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# OLD MANUAL
# ==========
if not self.use_arm_ik and self.use_old_topics:
arm_input = ArmManual() arm_input = ArmManual()
# OLD ARM MANUAL CONTROL SCHEME # Collect controller state
if not self.use_new_arm_manual_scheme: left_stick_x = deadzone(self.gamepad.get_axis(0))
# EF Grippers left_stick_y = deadzone(self.gamepad.get_axis(1))
if left_trigger > 0 and right_trigger > 0: left_trigger = deadzone(self.gamepad.get_axis(2))
arm_input.gripper = 0 right_stick_x = deadzone(self.gamepad.get_axis(3))
elif left_trigger > 0: right_stick_y = deadzone(self.gamepad.get_axis(4))
arm_input.gripper = -1 right_trigger = deadzone(self.gamepad.get_axis(5))
elif right_trigger > 0: right_bumper = self.gamepad.get_button(5)
arm_input.gripper = 1 dpad_input = self.gamepad.get_hat(0)
# Axis 0 # EF Grippers
if dpad_input[0] == 1:
arm_input.axis0 = 1
elif dpad_input[0] == -1:
arm_input.axis0 = -1
if right_bumper: # Control end effector
# Effector yaw
if left_stick_x > 0:
arm_input.effector_yaw = 1
elif left_stick_x < 0:
arm_input.effector_yaw = -1
# Effector roll
if right_stick_x > 0:
arm_input.effector_roll = 1
elif right_stick_x < 0:
arm_input.effector_roll = -1
else: # Control arm axis
# Axis 1
if abs(left_stick_x) > 0.15:
arm_input.axis1 = round(left_stick_x)
# Axis 2
if abs(left_stick_y) > 0.15:
arm_input.axis2 = -1 * round(left_stick_y)
# Axis 3
if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y)
# NEW ARM MANUAL CONTROL SCHEME
if self.use_new_arm_manual_scheme:
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
# Right stick: EF yaw and axis 3
arm_input.effector_yaw = stick_to_arm_direction(right_stick_x)
arm_input.axis3 = -1 * stick_to_arm_direction(right_stick_y)
# Left stick: axis 1 and 2
arm_input.axis1 = stick_to_arm_direction(left_stick_x)
arm_input.axis2 = -1 * stick_to_arm_direction(left_stick_y)
# D-pad: axis 0 and _
arm_input.axis0 = int(dpad_input[0])
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0
elif left_trigger > 0:
arm_input.gripper = -1
elif right_trigger > 0:
arm_input.gripper = 1
# Bumpers: EF roll
if left_bumper > 0 and right_bumper > 0:
arm_input.effector_roll = 0
elif left_bumper > 0:
arm_input.effector_roll = -1
elif right_bumper > 0:
arm_input.effector_roll = 1
# A: brake
if button_a:
arm_input.brake = True
# Y: linear actuator
if button_y and not button_b:
arm_input.linear_actuator = 1
elif button_b and not button_y:
arm_input.linear_actuator = -1
else:
arm_input.linear_actuator = 0
self.arm_publisher.publish(arm_input)
# NEW MANUAL
# ==========
elif not self.use_arm_ik and not self.use_old_topics:
arm_input = JointJog()
arm_input.header.frame_id = "base_link"
arm_input.header.stamp = self.get_clock().now().to_msg()
arm_input.joint_names = self.all_joint_names
arm_input.velocities = [0.0] * len(self.all_joint_names)
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
# Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = float(
stick_to_arm_direction(right_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = float(
stick_to_arm_direction(right_stick_y)
)
# Left stick: axis 1 and 2
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = float(
stick_to_arm_direction(left_stick_x)
)
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = float(
stick_to_arm_direction(left_stick_y)
)
# D-pad: axis 0 and _
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = float(
dpad_input[0]
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
arm_input.velocities[ arm_input.gripper = 0
self.all_joint_names.index("ef_gripper_left_joint")
] = 0.0
elif left_trigger > 0: elif left_trigger > 0:
arm_input.velocities[ arm_input.gripper = -1
self.all_joint_names.index("ef_gripper_left_joint")
] = -1.0
elif right_trigger > 0: elif right_trigger > 0:
arm_input.velocities[ arm_input.gripper = 1
self.all_joint_names.index("ef_gripper_left_joint")
] = 1.0
# Bumpers: EF roll # Axis 0
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = ( if dpad_input[0] == 1:
right_bumper - left_bumper arm_input.axis0 = 1
) elif dpad_input[0] == -1:
arm_input.axis0 = -1
# A: brake if right_bumper: # Control end effector
# TODO: Brake mode
# Y: linear actuator # Effector yaw
# TODO: linear actuator if left_stick_x > 0:
arm_input.effector_yaw = 1
elif left_stick_x < 0:
arm_input.effector_yaw = -1
self.arm_manual_pub_.publish(arm_input) # Effector roll
if right_stick_x > 0:
arm_input.effector_roll = 1
elif right_stick_x < 0:
arm_input.effector_roll = -1
# IK (ONLY NEW) else: # Control arm axis
# =============
elif self.use_arm_ik: # Axis 1
arm_twist = TwistStamped() if abs(left_stick_x) > 0.15:
arm_twist.header.frame_id = "base_link" arm_input.axis1 = round(left_stick_x)
arm_twist.header.stamp = self.get_clock().now().to_msg()
arm_jointjog = JointJog()
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Right stick: linear y and linear x # Axis 2
# Left stick: angular z and linear z if abs(left_stick_y) > 0.15:
# D-pad: angular y and _ arm_input.axis2 = -1 * round(left_stick_y)
# Triggers: EF grippers
# Bumpers: angular x
# A: brake
# B: IK mode
# X: manual mode
# Y: linear actuator
# Right stick: linear y and linear x # Axis 3
arm_twist.twist.linear.y = float(right_stick_x) if abs(right_stick_y) > 0.15:
arm_twist.twist.linear.x = float(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
# Left stick: angular z and linear z # BIO
arm_twist.twist.angular.z = float(-1 * left_stick_x)
arm_twist.twist.linear.z = float(-1 * left_stick_y)
# D-pad: angular y and _
arm_twist.twist.angular.y = (
float(0)
if dpad_input[0] == 0
else float(-1 * copysign(0.75, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append("ef_gripper_left_joint") # type: ignore
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
if left_bumper > 0 and right_bumper > 0:
arm_twist.twist.angular.x = float(0)
elif left_bumper > 0:
arm_twist.twist.angular.x = float(1)
elif right_bumper > 0:
arm_twist.twist.angular.x = float(-1)
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def send_bio(self):
# Collect controller state
left_stick_x = stick_deadzone(self.gamepad.get_axis(0))
left_stick_y = stick_deadzone(self.gamepad.get_axis(1))
left_trigger = stick_deadzone(self.gamepad.get_axis(2))
right_stick_x = stick_deadzone(self.gamepad.get_axis(3))
right_stick_y = stick_deadzone(self.gamepad.get_axis(4))
right_trigger = stick_deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100), drill_arm=int(round(right_stick_y) * -100),
) )
# Drill motor (FAERIE) # Drill motor (FAERIE)
if left_trigger > 0 or right_trigger > 0: if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int( bio_input.drill = int(
30 * (right_trigger - left_trigger) 30 * (right_trigger - left_trigger)
) # Max duty cycle 30% ) # Max duty cycle 30%
self.bio_publisher.publish(bio_input) self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(arm_input)
else: # self.bio_publisher.publish(bio_input)
pass # TODO: implement new bio control topics
def core_cmd_vel_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),
joint_names=self.all_joint_names,
velocities=[0.0] * len(self.all_joint_names),
)
def arm_ik_twist_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def stick_deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=0.05) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""
if abs(value) < threshold: if abs(value) < threshold:
return 0 return 0
return value return value
def stick_to_arm_direction(value: float, threshold=ARM_DEADZONE) -> int:
"""Apply a larger deadzone to a stick input and make digital/binary instead of analog"""
if abs(value) < threshold:
return 0
return int(copysign(1, value))
def is_user_in_group(group_name: str) -> bool: def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/ # Copied from https://zetcode.com/python/os-getgrouplist/
try: try:
@@ -723,26 +344,20 @@ def is_user_in_group(group_name: str) -> bool:
return False return False
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None): def main(args=None):
try: try:
rclpy.init(args=args) rclpy.init(args=args)
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException): except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...") print("Caught shutdown signal. Exiting...")
finally: finally:
rclpy.try_shutdown() rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -26,7 +26,7 @@ class LatencyTester : public rclcpp::Node
{ {
public: public:
LatencyTester() LatencyTester()
: Node("latency_tester"), count_(0) : Node("latency_tester"), count_(0), target_mcu_("core")
{ {
publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10); publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10);
timer_ = this->create_wall_timer( timer_ = this->create_wall_timer(
@@ -35,8 +35,6 @@ public:
"/anchor/debug", "/anchor/debug",
10, 10,
std::bind(&LatencyTester::response_callback, this, std::placeholders::_1)); std::bind(&LatencyTester::response_callback, this, std::placeholders::_1));
target_mcu_ = this->declare_parameter<std::string>("target_mcu", "core");
} }
private: private:

View File

@@ -0,0 +1,80 @@
cmake_minimum_required(VERSION 3.22)
project(servo_arm_twist_pkg)
# C++ Libraries #################################################
# Core C++ library for calculations and collision checking.
# Provides interface used by the component node.
set(SERVO_LIB_NAME servo_arm_twist_lib)
# Pose Tracking
set(POSE_TRACKING pose_tracking)
# Component Nodes (Shared libraries) ############################
set(SERVO_COMPONENT_NODE servo_node)
set(SERVO_CONTROLLER_INPUT servo_controller_input)
# Executable Nodes ##############################################
set(SERVO_NODE_MAIN_NAME servo_node_main)
set(POSE_TRACKING_DEMO_NAME servo_pose_tracking_demo)
set(FAKE_SERVO_CMDS_NAME fake_command_publisher)
#################################################################
# Common cmake code applied to all moveit packages
find_package(moveit_common REQUIRED)
moveit_package()
set(THIS_PACKAGE_INCLUDE_DEPENDS
control_msgs
control_toolbox
geometry_msgs
moveit_core
moveit_msgs
moveit_ros_planning
pluginlib
rclcpp
rclcpp_components
sensor_msgs
std_msgs
std_srvs
tf2_eigen
trajectory_msgs
)
find_package(ament_cmake REQUIRED)
find_package(eigen3_cmake_module REQUIRED)
find_package(Eigen3 REQUIRED)
foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()
#####################
## Component Nodes ##
#####################
# Add executable for using a controller
add_library(${SERVO_CONTROLLER_INPUT} SHARED src/joystick_twist.cpp)
ament_target_dependencies(${SERVO_CONTROLLER_INPUT} ${THIS_PACKAGE_INCLUDE_DEPENDS})
rclcpp_components_register_nodes(${SERVO_CONTROLLER_INPUT} "servo_arm_twist_pkg::JoyToServoPub")
#############
## Install ##
#############
# Install Libraries
install(
TARGETS
${SERVO_CONTROLLER_INPUT}
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
)
# Install Binaries
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(${THIS_PACKAGE_INCLUDE_DEPENDS})
ament_package()

View File

@@ -0,0 +1,3 @@
# Moveit Servo
See the [Realtime Arm Servoing Tutorial](https://moveit.picknik.ai/main/doc/realtime_servo/realtime_servo_tutorial.html) for installation instructions, quick-start guide, an overview about `moveit_servo`, and to learn how to set it up on your robot.

View File

@@ -0,0 +1,58 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>servo_arm_twist_pkg</name>
<version>2.5.9</version>
<description>Provides real-time manipulator Cartesian and joint servoing.</description>
<maintainer email="blakeanderson@utexas.edu">Blake Anderson</maintainer>
<maintainer email="andyz@utexas.edu">Andy Zelenak</maintainer>
<maintainer email="tyler@picknik.ai">Tyler Weaver</maintainer>
<maintainer email="henningkayser@picknik.ai">Henning Kayser</maintainer>
<license>BSD 3-Clause</license>
<url type="website">https://ros-planning.github.io/moveit_tutorials</url>
<author>Brian O'Neil</author>
<author email="andyz@utexas.edu">Andy Zelenak</author>
<author>Blake Anderson</author>
<author email="alex@machinekoder.com">Alexander Rössler</author>
<author email="tyler@picknik.ai">Tyler Weaver</author>
<author email="adam.pettinger@utexas.edu">Adam Pettinger</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>moveit_common</depend>
<depend>control_msgs</depend>
<depend>control_toolbox</depend>
<depend>geometry_msgs</depend>
<depend>moveit_msgs</depend>
<depend>moveit_core</depend>
<depend>moveit_ros_planning_interface</depend>
<depend>pluginlib</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>tf2_eigen</depend>
<depend>trajectory_msgs</depend>
<exec_depend>gripper_controllers</exec_depend>
<exec_depend>joint_state_broadcaster</exec_depend>
<exec_depend>joint_trajectory_controller</exec_depend>
<exec_depend>joy</exec_depend>
<exec_depend>robot_state_publisher</exec_depend>
<exec_depend>tf2_ros</exec_depend>
<exec_depend>moveit_configs_utils</exec_depend>
<exec_depend>launch_param_builder</exec_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>controller_manager</test_depend>
<test_depend>ros_testing</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -0,0 +1,268 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2020, PickNik Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of PickNik Inc. nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
/* Title : joystick_servo_example.cpp
* Project : servo_arm_twist_pkg
* Created : 08/07/2020
* Author : Adam Pettinger
*/
#include <sensor_msgs/msg/joy.hpp>
#include <geometry_msgs/msg/twist_stamped.hpp>
#include <control_msgs/msg/joint_jog.hpp>
#include <std_srvs/srv/trigger.hpp>
#include <moveit_msgs/msg/planning_scene.hpp>
#include <rclcpp/node.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/subscription.hpp>
#include <rclcpp/qos.hpp>
#include <rclcpp/time.hpp>
// We'll just set up parameters here
const std::string JOY_TOPIC = "/joy";
const std::string TWIST_TOPIC = "/servo_node/delta_twist_cmds";
const std::string JOINT_TOPIC = "/servo_node/delta_joint_cmds";
const std::string EEF_FRAME_ID = "End_Effector";
const std::string BASE_FRAME_ID = "base_link";
// Enums for button names -> axis/button array index
// For XBOX 1 controller
enum Axis
{
LEFT_STICK_X = 0,
LEFT_STICK_Y = 1,
LEFT_TRIGGER = 2,
RIGHT_STICK_X = 3,
RIGHT_STICK_Y = 4,
RIGHT_TRIGGER = 5,
D_PAD_X = 6,
D_PAD_Y = 7
};
enum Button
{
A = 0,
B = 1,
X = 2,
Y = 3,
LEFT_BUMPER = 4,
RIGHT_BUMPER = 5,
CHANGE_VIEW = 6,
MENU = 7,
HOME = 8,
LEFT_STICK_CLICK = 9,
RIGHT_STICK_CLICK = 10
};
// Some axes have offsets (e.g. the default trigger position is 1.0 not 0)
// This will map the default values for the axes
std::map<Axis, double> AXIS_DEFAULTS = { { LEFT_TRIGGER, 1.0 }, { RIGHT_TRIGGER, 1.0 } };
std::map<Button, double> BUTTON_DEFAULTS;
// To change controls or setup a new controller, all you should to do is change the above enums and the follow 2
// functions
/** \brief // This converts a joystick axes and buttons array to a TwistStamped or JointJog message
* @param axes The vector of continuous controller joystick axes
* @param buttons The vector of discrete controller button values
* @param twist A TwistStamped message to update in prep for publishing
* @param joint A JointJog message to update in prep for publishing
* @return return true if you want to publish a Twist, false if you want to publish a JointJog
*/
bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& buttons,
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist
// std::unique_ptr<control_msgs::msg::JointJog>& joint
)
{
// // Give joint jogging priority because it is only buttons
// // If any joint jog command is requested, we are only publishing joint commands
// if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y])
// {
// // Map the D_PAD to the proximal joints
// joint->joint_names.push_back("panda_joint1");
// joint->velocities.push_back(axes[D_PAD_X]);
// joint->joint_names.push_back("panda_joint2");
// joint->velocities.push_back(axes[D_PAD_Y]);
// // Map the diamond to the distal joints
// joint->joint_names.push_back("panda_joint7");
// joint->velocities.push_back(buttons[B] - buttons[X]);
// joint->joint_names.push_back("panda_joint6");
// joint->velocities.push_back(buttons[Y] - buttons[A]);
// return false;
// }
// The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y];
twist->twist.linear.y = axes[RIGHT_STICK_X];
double lin_x_right = -0.5 * (axes[RIGHT_TRIGGER] - AXIS_DEFAULTS.at(RIGHT_TRIGGER));
double lin_x_left = 0.5 * (axes[LEFT_TRIGGER] - AXIS_DEFAULTS.at(LEFT_TRIGGER));
twist->twist.linear.x = lin_x_right + lin_x_left;
twist->twist.angular.y = axes[LEFT_STICK_Y];
twist->twist.angular.x = axes[LEFT_STICK_X];
double roll_positive = buttons[RIGHT_BUMPER];
double roll_negative = -1 * (buttons[LEFT_BUMPER]);
twist->twist.angular.z = roll_positive + roll_negative;
return true;
}
/** \brief // This should update the frame_to_publish_ as needed for changing command frame via controller
* @param frame_name Set the command frame to this
* @param buttons The vector of discrete controller button values
*/
void updateCmdFrame(std::string& frame_name, const std::vector<int>& buttons)
{
if (buttons[CHANGE_VIEW] && frame_name == EEF_FRAME_ID)
frame_name = BASE_FRAME_ID;
else if (buttons[MENU] && frame_name == BASE_FRAME_ID)
frame_name = EEF_FRAME_ID;
}
namespace servo_arm_twist_pkg
{
class JoyToServoPub : public rclcpp::Node
{
public:
JoyToServoPub(const rclcpp::NodeOptions& options)
: Node("joy_to_twist_publisher", options), frame_to_publish_(BASE_FRAME_ID)
{
// Setup pub/sub
joy_sub_ = this->create_subscription<sensor_msgs::msg::Joy>(
JOY_TOPIC, rclcpp::SystemDefaultsQoS(),
[this](const sensor_msgs::msg::Joy::ConstSharedPtr& msg) { return joyCB(msg); });
twist_pub_ = this->create_publisher<geometry_msgs::msg::TwistStamped>(TWIST_TOPIC, rclcpp::SystemDefaultsQoS());
joint_pub_ = this->create_publisher<control_msgs::msg::JointJog>(JOINT_TOPIC, rclcpp::SystemDefaultsQoS());
// collision_pub_ =
// this->create_publisher<moveit_msgs::msg::PlanningScene>("/planning_scene", rclcpp::SystemDefaultsQoS());
// Create a service client to start the ServoNode
servo_start_client_ = this->create_client<std_srvs::srv::Trigger>("/servo_node/start_servo");
servo_start_client_->wait_for_service(std::chrono::seconds(1));
servo_start_client_->async_send_request(std::make_shared<std_srvs::srv::Trigger::Request>());
// // Load the collision scene asynchronously
// collision_pub_thread_ = std::thread([this]() {
// rclcpp::sleep_for(std::chrono::seconds(3));
// // Create collision object, in the way of servoing
// moveit_msgs::msg::CollisionObject collision_object;
// collision_object.header.frame_id = "panda_link0";
// collision_object.id = "box";
// shape_msgs::msg::SolidPrimitive table_1;
// table_1.type = table_1.BOX;
// table_1.dimensions = { 0.4, 0.6, 0.03 };
// geometry_msgs::msg::Pose table_1_pose;
// table_1_pose.position.x = 0.6;
// table_1_pose.position.y = 0.0;
// table_1_pose.position.z = 0.4;
// shape_msgs::msg::SolidPrimitive table_2;
// table_2.type = table_2.BOX;
// table_2.dimensions = { 0.6, 0.4, 0.03 };
// geometry_msgs::msg::Pose table_2_pose;
// table_2_pose.position.x = 0.0;
// table_2_pose.position.y = 0.5;
// table_2_pose.position.z = 0.25;
// collision_object.primitives.push_back(table_1);
// collision_object.primitive_poses.push_back(table_1_pose);
// collision_object.primitives.push_back(table_2);
// collision_object.primitive_poses.push_back(table_2_pose);
// collision_object.operation = collision_object.ADD;
// moveit_msgs::msg::PlanningSceneWorld psw;
// psw.collision_objects.push_back(collision_object);
// auto ps = std::make_unique<moveit_msgs::msg::PlanningScene>();
// ps->world = psw;
// ps->is_diff = true;
// collision_pub_->publish(std::move(ps));
// });
}
// ~JoyToServoPub() override
// {
// if (collision_pub_thread_.joinable())
// collision_pub_thread_.join();
// }
void joyCB(const sensor_msgs::msg::Joy::ConstSharedPtr& msg)
{
// Create the messages we might publish
auto twist_msg = std::make_unique<geometry_msgs::msg::TwistStamped>();
auto joint_msg = std::make_unique<control_msgs::msg::JointJog>();
// This call updates the frame for twist commands
updateCmdFrame(frame_to_publish_, msg->buttons);
// Convert the joystick message to Twist or JointJog and publish
if (convertJoyToCmd(msg->axes, msg->buttons, twist_msg))
{
// publish the TwistStamped
twist_msg->header.frame_id = frame_to_publish_;
twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg));
}
// else
// {
// // publish the JointJog
// joint_msg->header.stamp = this->now();
// joint_msg->header.frame_id = "panda_link3";
// joint_pub_->publish(std::move(joint_msg));
// }
}
private:
rclcpp::Subscription<sensor_msgs::msg::Joy>::SharedPtr joy_sub_;
rclcpp::Publisher<geometry_msgs::msg::TwistStamped>::SharedPtr twist_pub_;
rclcpp::Publisher<control_msgs::msg::JointJog>::SharedPtr joint_pub_;
rclcpp::Publisher<moveit_msgs::msg::PlanningScene>::SharedPtr collision_pub_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr servo_start_client_;
std::string frame_to_publish_;
// std::thread collision_pub_thread_;
}; // class JoyToServoPub
} // namespace servo_arm_twist_pkg
// Register the component with class_loader
#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(servo_arm_twist_pkg::JoyToServoPub)