3 Commits

28 changed files with 413 additions and 1153 deletions

View File

@@ -16,9 +16,6 @@ You will use these packages to launch all rover-side ROS2 nodes.
- [Connecting the GuliKit Controller](#connecting-the-gulikit-controller) - [Connecting the GuliKit Controller](#connecting-the-gulikit-controller)
- [Common Problems/Toubleshooting](#common-problemstroubleshooting) - [Common Problems/Toubleshooting](#common-problemstroubleshooting)
- [Packages](#packages) - [Packages](#packages)
- [Graphs](#graphs)
- [Full System](#full-system)
- [Individual Nodes](#individual-nodes)
- [Maintainers](#maintainers) - [Maintainers](#maintainers)
## Software Prerequisites ## Software Prerequisites
@@ -143,40 +140,6 @@ A: To find a microcontroller to talk to, Anchor sends a ping to every Serial por
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`). - [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK. - [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Graphs
### Full System
> **Anchor stand-alone** (`ros2 launch anchor_pkg rover.launch.py`)
>
> ![rqt_graph of Anchor by itself, ran with command: ros2 launch anchor_pkg rover.launch.py](./docs-resources/graph-anchor-standalone.png)
> **Anchor with [basestation-classic](https://github.com/SHC-ASTRA/basestation-classic)**
>
> ![rqt_graph of Anchor ran with the same command as above, talking to basestation-classic](./docs-resources/graph-anchor-w-basestation-classic.png)
> **Anchor with Headless** (`ros2 run headless_pkg headless_full`)
>
> ![rqt_graph of Anchor ran with Headless](./docs-resources/graph-anchor-w-headless.png)
### Individual Nodes
> **Anchor** (`ros2 run anchor_pkg anchor`)
>
> ![rqt_graph of Anchor node running by itself](./docs-resources/graph-anchor-anchor-standalone.png)
> **Core** (`ros2 run core_pkg core --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Core node running by itself](./docs-resources/graph-anchor-core-standalone.png)
> **Arm** (`ros2 run arm_pkg arm --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Arm node running by itself](./docs-resources/graph-anchor-arm-standalone.png)
> **Bio** (`ros2 run bio_pkg bio --ros-args -p launch_mode:=anchor`)
>
> ![rqt_graph of Bio node running by itself](./docs-resources/graph-anchor-bio-standalone.png)
## Maintainers ## Maintainers
| Name | Email | Discord | | Name | Email | Discord |

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 543 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 439 KiB

16
flake.lock generated
View File

@@ -24,27 +24,27 @@
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": { "locked": {
"lastModified": 1770108954, "lastModified": 1761810010,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=", "narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"owner": "lopsided98", "owner": "lopsided98",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a", "rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "lopsided98", "owner": "lopsided98",
"ref": "develop", "ref": "master",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"type": "github" "type": "github"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1759381078, "lastModified": 1744849697,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=", "narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=",
"owner": "NixOS", "owner": "lopsided98",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee", "rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor"; description = "Development environment for ASTRA Anchor";
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
}; };
@@ -18,6 +18,12 @@
pkgs = import nixpkgs { pkgs = import nixpkgs {
inherit system; inherit system;
overlays = [ nix-ros-overlay.overlays.default ]; overlays = [ nix-ros-overlay.overlays.default ];
config = {
# Allow the insecure freeimage package
permittedInsecurePackages = [
"freeimage-3.18.0-unstable-2024-04-18"
];
};
}; };
in in
{ {
@@ -35,7 +41,7 @@
] ]
)) ))
( (
with rosPackages.humble; with rosPackages.jazzy;
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
@@ -56,23 +62,22 @@
control-msgs control-msgs
control-toolbox control-toolbox
moveit-core moveit-core
moveit-planners
moveit-common moveit-common
moveit-msgs moveit-msgs
moveit-ros-planning moveit-ros-planning
moveit-ros-planning-interface moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils moveit-configs-utils
moveit-ros-move-group moveit-ros-move-group
moveit-servo moveit-servo
moveit-simple-controller-manager moveit-simple-controller-manager
topic-based-ros2-control
pilz-industrial-motion-planner pilz-industrial-motion-planner
pick-ik pick-ik
ompl ompl
joy
ros2-controllers
chomp-motion-planner chomp-motion-planner
joy
ros-gz-sim
ros-gz-bridge
# ros2-controllers nixpkg does not build :(
]; ];
} }
) )
@@ -81,6 +86,10 @@
# Display stuff # Display stuff
export DISPLAY=''${DISPLAY:-:0} export DISPLAY=''${DISPLAY:-:0}
export QT_X11_NO_MITSHM=1 export QT_X11_NO_MITSHM=1
if [[ ! $DIRENV_IN_ENVRC ]]; then
eval "$(${pkgs.python3Packages.argcomplete}/bin/register-python-argcomplete ros2)"
eval "$(${pkgs.python3Packages.argcomplete}/bin/register-python-argcomplete colcon)"
fi
''; '';
}; };
} }

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty from std_srvs.srv import Empty
import signal import signal
@@ -8,7 +7,6 @@ import time
import atexit import atexit
import serial import serial
import serial.tools.list_ports
import os import os
import sys import sys
import threading import threading
@@ -17,17 +15,12 @@ import glob
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
KNOWN_USBS = [ serial_pub = None
(0x2E8A, 0x00C0), # Raspberry Pi Pico thread = None
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
class Anchor(Node): """
""" Publishers:
Publishers:
* /anchor/from_vic/debug * /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging - Every string received from the MCU is published here for debugging
* /anchor/from_vic/core * /anchor/from_vic/core
@@ -37,71 +30,28 @@ class Anchor(Node):
* /anchor/from_vic/bio * /anchor/from_vic/bio
- VicCAN messages for Bio node - VicCAN messages for Bio node
Subscribers: Subscribers:
* /anchor/from_vic/mock_mcu * /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU - For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay * /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU - Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string * /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging - Publish raw strings to this topic to send directly to the MCU for debugging
""" """
class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher' super().__init__("anchor_node") # previously 'serial_publisher'
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
# Serial port override
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
##################################################
# Serial MCU Discovery
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
real_ports = list(
filter(
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None: self.port = None
self.get_logger().warning("Falling back to legacy MCU discovery...") if port_override := os.getenv("PORT_OVERRIDE"):
ports = Anchor.list_serial_ports() self.port = port_override
for _ in range(4): ports = SerialRelay.list_serial_ports()
if self.serial_port is not None: for i in range(4):
if self.port is not None:
break break
for port in ports: for port in ports:
try: try:
@@ -113,60 +63,22 @@ class Anchor(Node):
# if pong is in response, then we are talking with the MCU # if pong is in response, then we are talking with the MCU
if b"pong" in response: if b"pong" in response:
self.serial_port = port self.port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!") self.get_logger().info(f"Found MCU at {self.port}!")
break break
except: except:
pass pass
# If port is still None then we ain't finding no mcu if self.port is None:
if self.serial_port is None: self.get_logger().info("Unable to find MCU...")
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1) time.sleep(1)
sys.exit(1) sys.exit(1)
# Close serial port on exit self.ser = serial.Serial(self.port, 115200)
self.get_logger().info(f"Enabling Relay Mode")
self.ser.write(b"can_relay_mode,on\n")
atexit.register(self.cleanup) atexit.register(self.cleanup)
##################################################
# ROS2 Topic Setup
# New pub/sub with VicCAN # New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher( self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20 String, "/anchor/from_vic/debug", 20
@@ -203,10 +115,22 @@ class Anchor(Node):
String, "/anchor/relay", self.on_relay_tovic_string, 10 String, "/anchor/relay", self.on_relay_tovic_string, 10
) )
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" """Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
try: try:
output = str(self.serial_interface.readline(), "utf8") output = str(self.ser.readline(), "utf8")
if output: if output:
self.relay_fromvic(output) self.relay_fromvic(output)
@@ -232,20 +156,14 @@ class Anchor(Node):
except serial.SerialException as e: except serial.SerialException as e:
print(f"SerialException: {e}") print(f"SerialException: {e}")
print("Closing serial port.") print("Closing serial port.")
try: if self.ser.is_open:
if self.serial_interface.is_open: self.ser.close()
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
try: if self.ser.is_open:
if self.serial_interface.is_open: self.ser.close()
self.serial_interface.close()
except:
pass
exit(1) exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
@@ -266,7 +184,7 @@ class Anchor(Node):
output += f",{round(num, 7)}" # limit to 7 decimal places output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n" output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}") # self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.serial_interface.write(bytes(output, "utf8")) self.ser.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str): def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic""" """Relay a string message from the MCU to the appropriate VicCAN topic"""
@@ -328,7 +246,7 @@ class Anchor(Node):
"""Relay a raw string message to the MCU for debugging""" """Relay a raw string message to the MCU for debugging"""
message = msg.data message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}") # self.get_logger().info(f"Sending command to MCU: {msg}")
self.serial_interface.write(bytes(message, "utf8")) self.ser.write(bytes(message, "utf8"))
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():
@@ -336,29 +254,28 @@ class Anchor(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") print("Cleaning up before terminating...")
if self.serial_interface.is_open: if self.ser.is_open:
self.serial_interface.close() self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
try:
rclpy.init(args=args) rclpy.init(args=args)
anchor_node = Anchor() sys.excepthook = myexcepthook
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True) global serial_pub
thread.start()
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate serial_pub = SerialRelay()
while rclpy.ok(): serial_pub.run()
anchor_node.read_MCU() # Check the MCU for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal( signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0) signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly ) # Catch termination signals and exit cleanly

View File

@@ -8,7 +8,6 @@ from launch.substitutions import (
PathJoinSubstitution, PathJoinSubstitution,
) )
from launch_ros.actions import Node from launch_ros.actions import Node
from launch.conditions import IfCondition
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
@@ -51,7 +50,6 @@ def launch_setup(context, *args, **kwargs):
executable="ptz", # change as needed executable="ptz", # change as needed
name="ptz", name="ptz",
output="both", output="both",
condition=IfCondition(LaunchConfiguration("use_ptz", default="true")),
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
import serial import serial
import sys import sys
import threading import threading
@@ -8,10 +7,11 @@ import glob
import time import time
import atexit import atexit
import signal import signal
from std_msgs.msg import String, Header from std_msgs.msg import String
from astra_msgs.msg import ArmManual
from astra_msgs.msg import SocketFeedback
from astra_msgs.msg import DigitFeedback
from sensor_msgs.msg import JointState from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math import math
# control_qos = qos.QoSProfile( # control_qos = qos.QoSProfile(
@@ -25,68 +25,37 @@ import math
# liveliness_lease_duration=5000 # liveliness_lease_duration=5000
# ) # )
serial_pub = None
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
class SerialRelay(Node):
class ArmNode(Node):
def __init__(self): def __init__(self):
# Initialize node
super().__init__("arm_node") super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output # Get launch mode parameter
self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
################################################## # Create publishers
# Topics self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.socket_pub = self.create_publisher(
# Anchor topics SocketFeedback, "/arm/feedback/socket", 10
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
) )
self.anchor_tovic_pub_ = self.create_publisher( self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
VicCAN, "/anchor/to_vic/relay", 20 self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
) )
self.anchor_sub = self.create_subscription( # New messages
String, "/anchor/arm/feedback", self.anchor_feedback, 10 self.joint_state_pub = self.create_publisher(JointState, "joint_states", 10)
) self.joint_state = JointState()
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.joint_state.name = [
# Control
# Manual: who tf knows. Maybe JointJog?
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"Axis_0_Joint", "Axis_0_Joint",
"Axis_1_Joint", "Axis_1_Joint",
"Axis_2_Joint", "Axis_2_Joint",
@@ -95,63 +64,117 @@ class ArmNode(Node):
"Wrist-EF_Roll_Joint", "Wrist-EF_Roll_Joint",
"Gripper_Slider_Left", "Gripper_Slider_Left",
] ]
self.saved_joint_state.position = [0.0] * len( self.joint_state.position = [0.0] * len(
self.saved_joint_state.name self.joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros ) # Initialize with zeros
# Old self.joint_command_sub = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 10
# Create publishers
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
) )
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
# Create subscribers # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
self.man_sub = self.create_subscription( if self.launch_mode == "arm":
ArmManual, "/arm/control/manual", self.send_manual, 10 # Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for _ in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
) )
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
pass if self.launch_mode == "arm":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
return # command needs either position or velocity for all 7 joints
# Assumed order: Axis0, Axis1, Axis2, Axis3, Wrist_Yaw, Wrist_Roll, Gripper
# TODO: formalize joint names in URDF, refactor here to depend on joint names
# Embedded takes deg*10, ROS2 uses Radians # Embedded takes deg*10, ROS2 uses Radians
velocities = [ positions = [math.degrees(pos) * 10 for pos in msg.position]
math.degrees(vel) * 10 if abs(vel) > 0.05 else 0.0 for vel in msg.velocity
]
# Axis 2 & 3 URDF direction is inverted # Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2] positions[2] = -positions[2]
velocities[3] = -velocities[3] positions[3] = -positions[3]
# Axis 0-3 # Set target angles for each arm axis for embedded IK PID to handle
arm_cmd = VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3]) command = f"can_relay_tovic,arm,32,{positions[0]},{positions[1]},{positions[2]},{positions[3]}\n"
arm_cmd.header = Header(stamp=self.get_clock().now().to_msg()) # Wrist yaw and roll
# Wrist yaw and roll, gripper included for future use when have adequate hardware command += f"can_relay_tovic,digit,32,{positions[4]},{positions[5]}\n"
digit_cmd = VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6]) # Gripper IK does not have adequate hardware yet
digit_cmd.header = Header(stamp=self.get_clock().now().to_msg()) self.send_cmd(command)
self.anchor_tovic_pub_.publish(arm_cmd)
self.anchor_tovic_pub_.publish(digit_cmd)
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
axis0 = msg.axis0 axis0 = msg.axis0
@@ -178,14 +201,23 @@ class ArmNode(Node):
return return
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
output = String(data=msg) if (
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
# pass
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
elif output.startswith("can_relay_fromvic,arm,54"): elif output.startswith("can_relay_fromvic,arm,54"):
# pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
@@ -203,132 +235,15 @@ class ArmNode(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians(
float(parts[4])
) # Wrist roll
self.joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return
def relay_fromvic(self, msg: VicCAN):
# Code for socket and digit are broken out for cleaner code
if msg.mcu_name == "arm":
self.process_fromvic_arm(msg)
elif msg.mcu_name == "digit":
self.process_fromvic_digit(msg)
def process_fromvic_arm(self, msg: VicCAN):
if msg.mcu_name != "arm":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.temperature = float(msg.data[1]) / 10.0
motor.voltage = float(msg.data[2]) / 10.0
motor.current = float(msg.data[3]) / 10.0
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 54: # Board voltages
self.arm_feedback_new.socket_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.socket_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.socket_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.socket_voltage.v3 = float(msg.data[3]) / 100.0
case 55: # Arm joint positions
angles = [angle / 10.0 for angle in msg.data] # VicCAN sends deg*10
# Joint state publisher for URDF visualization
self.saved_joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.saved_joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.saved_joint_state.position[2] = math.radians(
-angles[2]
) # Axis 2 (inverted)
self.saved_joint_state.position[3] = math.radians(
-angles[3]
) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
case 58: # REV SPARK MAX position and velocity feedback
motorId = round(msg.data[0])
motor: RevMotorState | None = None
match motorId:
case 1:
motor = self.arm_feedback_new.axis1_motor
case 2:
motor = self.arm_feedback_new.axis2_motor
case 3:
motor = self.arm_feedback_new.axis3_motor
case 4:
motor = self.arm_feedback_new.axis0_motor
if motor:
motor.position = float(msg.data[1])
motor.velocity = float(msg.data[2])
motor.header.stamp = msg.header.stamp
self.arm_feedback_pub_.publish(self.arm_feedback_new)
case 59: # Arm joint velocities
velocities = [vel / 100.0 for vel in msg.data] # VicCAN sends deg/s*100
self.saved_joint_state.velocity[0] = math.radians(
velocities[0]
) # Axis 0
self.saved_joint_state.velocity[1] = math.radians(
velocities[1]
) # Axis 1
self.saved_joint_state.velocity[2] = math.radians(
-velocities[2]
) # Axis 2 (-)
self.saved_joint_state.velocity[3] = math.radians(
-velocities[3]
) # Axis 3 (-)
# Wrist is handled by digit feedback
self.saved_joint_state.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(self.saved_joint_state)
def process_fromvic_digit(self, msg: VicCAN):
if msg.mcu_name != "digit":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]
) # Wrist roll
self.saved_joint_state.position[5] = math.radians(
msg.data[1]
) # Wrist yaw
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
@@ -348,6 +263,16 @@ class ArmNode(Node):
self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis2_angle = angles[2]
self.arm_feedback.axis3_angle = angles[3] self.arm_feedback.axis3_angle = angles[3]
# Joint state publisher for URDF visualization
self.joint_state.position[0] = math.radians(angles[0]) # Axis 0
self.joint_state.position[1] = math.radians(angles[1]) # Axis 1
self.joint_state.position[2] = math.radians(-angles[2]) # Axis 2 (inverted)
self.joint_state.position[3] = math.radians(-angles[3]) # Axis 3 (inverted)
# Wrist is handled by digit feedback
self.joint_state.header.stamp = self.get_clock().now().to_msg()
self.joint_state_pub.publish(self.joint_state)
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
@@ -388,13 +313,33 @@ class ArmNode(Node):
self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
arm_node = ArmNode() global serial_pub
arm_node.run() serial_pub = SerialRelay()
rclpy.try_shutdown() serial_pub.run()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos from rclpy import qos
from rclpy.duration import Duration from rclpy.duration import Duration
@@ -12,14 +11,10 @@ import os
import sys import sys
import threading import threading
import glob import glob
import pwd
import grp
from math import copysign from math import copysign
from std_srvs.srv import Trigger
from std_msgs.msg import String from std_msgs.msg import String
from geometry_msgs.msg import Twist, TwistStamped from geometry_msgs.msg import Twist
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState from astra_msgs.msg import CoreCtrlState
@@ -47,17 +42,8 @@ control_qos = qos.QoSProfile(
liveliness_lease_duration=Duration(seconds=5), liveliness_lease_duration=Duration(seconds=5),
) )
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
)
CORE_MODE = "twist" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
class Headless(Node): class Headless(Node):
def __init__(self): def __init__(self):
@@ -85,47 +71,14 @@ class Headless(Node):
print("No gamepad found. Waiting...") print("No gamepad found. Waiting...")
# Initialize the gamepad # Initialize the gamepad
id = 0 self.gamepad = pygame.joystick.Joystick(0)
while True:
self.num_gamepads = pygame.joystick.get_count()
if id >= self.num_gamepads:
self.get_logger().fatal("Ran out of controllers to try")
sys.exit(1)
try:
self.gamepad = pygame.joystick.Joystick(id)
self.gamepad.init() self.gamepad.init()
except Exception as e:
self.get_logger().error("Error when initializing gamepad")
self.get_logger().error(e)
id += 1
continue
print(f"Gamepad Found: {self.gamepad.get_name()}") print(f"Gamepad Found: {self.gamepad.get_name()}")
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5: self.create_timer(0.15, self.send_controls)
self.get_logger().error("Controller not correctly initialized.")
if not is_user_in_group("input"):
self.get_logger().warning(
"If using NixOS, you may need to add yourself to the 'input' group."
)
if is_user_in_group("plugdev"):
self.get_logger().warning(
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
)
else:
break
id += 1
self.create_timer(0.1, self.send_controls)
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2) self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher( self.core_twist_pub_ = self.create_publisher(
@@ -135,41 +88,6 @@ class Headless(Node):
CoreCtrlState, "/core/control/state", qos_profile=control_qos CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value
self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
# Check parameter validity
if self.arm_mode not in ["manual", "ik"]:
self.get_logger().warn(
f"Invalid value '{self.arm_mode}' for arm_mode parameter. Defaulting to 'manual' (per-axis control)."
)
if self.arm_manual_scheme not in ["old", "new"]:
self.get_logger().warn(
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
)
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.arm_mode == "ik":
self.get_logger().info("Starting servo node for IK control...")
self.servo_start_client = self.create_client(
Trigger, "/servo_node/start_servo"
)
timeout_counter = 0
while not self.servo_start_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Waiting for servo_node/start_servo service...")
timeout_counter += 1
if timeout_counter >= 10:
self.get_logger().error(
"Servo's start service not available. IK control will not work."
)
break
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed) self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
@@ -177,6 +95,18 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
for event in pygame.event.get(): for event in pygame.event.get():
@@ -185,7 +115,7 @@ class Headless(Node):
sys.exit(0) sys.exit(0)
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads: if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Send one last zero control message # Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG) self.core_publisher.publish(CORE_STOP_MSG)
@@ -206,9 +136,6 @@ class Headless(Node):
new_ctrl_mode = "core" new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode: if new_ctrl_mode != self.ctrl_mode:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.gamepad.rumble(0.6, 0.7, 75) self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
@@ -237,6 +164,8 @@ class Headless(Node):
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.core_publisher.publish(input) self.core_publisher.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
elif self.ctrl_mode == "core" and CORE_MODE == "twist": elif self.ctrl_mode == "core" and CORE_MODE == "twist":
input = Twist() input = Twist()
@@ -256,6 +185,8 @@ class Headless(Node):
# Publish # Publish
self.core_twist_pub_.publish(input) self.core_twist_pub_.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info( self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}" f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
) )
@@ -286,7 +217,7 @@ class Headless(Node):
) )
# ARM and BIO # ARM and BIO
if self.ctrl_mode == "arm" and self.arm_mode == "manual": if self.ctrl_mode == "arm":
arm_input = ArmManual() arm_input = ArmManual()
# Collect controller state # Collect controller state
@@ -296,15 +227,9 @@ class Headless(Node):
right_stick_x = deadzone(self.gamepad.get_axis(3)) right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4)) right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5)) right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5) right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0) dpad_input = self.gamepad.get_hat(0)
if self.arm_manual_scheme == "old":
# EF Grippers # EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0 arm_input.gripper = 0
@@ -347,64 +272,6 @@ class Headless(Node):
if abs(right_stick_y) > 0.15: if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
if self.arm_manual_scheme == "new":
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: _
# X: _
# Y: linear actuator
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.effector_yaw = (
0 if abs(right_stick_x) < ARM_THRESHOLD else int(copysign(1, right_stick_x))
)
arm_input.axis3 = (
0 if abs(right_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, right_stick_y))
)
# Left stick: axis 1 and 2
arm_input.axis1 = (
0 if abs(left_stick_x) < ARM_THRESHOLD else int(copysign(1, left_stick_x))
)
arm_input.axis2 = (
0 if abs(left_stick_y) < ARM_THRESHOLD else int(-1 * copysign(1, left_stick_y))
)
# D-pad: axis 0 and _
arm_input.axis0 = (
0 if dpad_input[0] == 0 else int(copysign(1, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0
elif left_trigger > 0:
arm_input.gripper = -1
elif right_trigger > 0:
arm_input.gripper = 1
# Bumpers: EF roll
if left_bumper > 0 and right_bumper > 0:
arm_input.effector_roll = 0
elif left_bumper > 0:
arm_input.effector_roll = -1
elif right_bumper > 0:
arm_input.effector_roll = 1
# A: brake
if button_a:
arm_input.brake = True
# Y: linear actuator
if button_y:
arm_input.linear_actuator = 1
# BIO # BIO
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
@@ -417,109 +284,23 @@ class Headless(Node):
30 * (right_trigger - left_trigger) 30 * (right_trigger - left_trigger)
) # Max duty cycle 30% ) # Max duty cycle 30%
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(arm_input) self.arm_publisher.publish(arm_input)
# self.bio_publisher.publish(bio_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg()
arm_jointjog = JointJog()
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x
# Left stick: angular z and linear z
# D-pad: angular y and _
# Triggers: EF grippers
# Bumpers: angular x
# A: brake
# B: IK mode
# X: manual mode
# Y: linear actuator
# Right stick: linear y and linear x
arm_twist.twist.linear.y = float(right_stick_x)
arm_twist.twist.linear.x = float(right_stick_y)
# Left stick: angular z and linear z
arm_twist.twist.angular.z = float(-1 * left_stick_x)
arm_twist.twist.linear.z = float(-1 * left_stick_y)
# D-pad: angular y and _
arm_twist.twist.angular.y = (
float(0)
if dpad_input[0] == 0
else float(-1 * copysign(0.75, dpad_input[0]))
)
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append(
"Gripper_Slider_Left" # TODO: Update joint name
)
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
if left_bumper > 0 and right_bumper > 0:
arm_twist.twist.angular.x = float(0)
elif left_bumper > 0:
arm_twist.twist.angular.x = float(1)
elif right_bumper > 0:
arm_twist.twist.angular.x = float(-1)
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=0.05) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""
if abs(value) < threshold: if abs(value) < threshold:
return 0 return 0
return value return value
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
username = os.getlogin()
# Get group ID from name
group_info = grp.getgrnam(group_name)
target_gid = group_info.gr_gid
# Get user's groups
user_info = pwd.getpwnam(username)
user_groups = os.getgrouplist(username, user_info.pw_gid)
return target_gid in user_groups
except KeyError:
return False
def main(args=None): def main(args=None):
try:
rclpy.init(args=args) rclpy.init(args=args)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException): rclpy.shutdown()
print("Caught shutdown signal. Exiting...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -26,7 +26,7 @@ class LatencyTester : public rclcpp::Node
{ {
public: public:
LatencyTester() LatencyTester()
: Node("latency_tester"), count_(0) : Node("latency_tester"), count_(0), target_mcu_("core")
{ {
publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10); publisher_ = this->create_publisher<std_msgs::msg::String>("/anchor/relay", 10);
timer_ = this->create_wall_timer( timer_ = this->create_wall_timer(
@@ -35,8 +35,6 @@ public:
"/anchor/debug", "/anchor/debug",
10, 10,
std::bind(&LatencyTester::response_callback, this, std::placeholders::_1)); std::bind(&LatencyTester::response_callback, this, std::placeholders::_1));
target_mcu_ = this->declare_parameter<std::string>("target_mcu", "core");
} }
private: private:

View File

@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.8)
project(moveit_servo)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@@ -0,0 +1,4 @@
panda_arm:
kinematics_solver: kdl_kinematics_plugin/KDLKinematicsPlugin
kinematics_solver_search_resolution: 0.005
kinematics_solver_timeout: 0.05

View File

@@ -0,0 +1,25 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from moveit_configs_utils import MoveItConfigsBuilder
def generate_launch_description():
moveit_config = (
MoveItConfigsBuilder("astra_descriptions")
.robot_description(file_path="config/robot.urdf.xacro")
.to_moveit_configs()
)
servo_node = Node(
package="moveit_servo",
executable="servo_node",
name="servo_node",
parameters=[
"config/servo_parameters.yaml",
moveit_config.robot_description,
moveit_config.robot_description_semantic,
moveit_config.robot_description_kinematics,
],
output="screen",
)
return LaunchDescription([servo_node])

View File

@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>moveit_servo</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="aaron.m.davis@comcast.net">aaron</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -1,80 +0,0 @@
cmake_minimum_required(VERSION 3.22)
project(servo_arm_twist_pkg)
# C++ Libraries #################################################
# Core C++ library for calculations and collision checking.
# Provides interface used by the component node.
set(SERVO_LIB_NAME servo_arm_twist_lib)
# Pose Tracking
set(POSE_TRACKING pose_tracking)
# Component Nodes (Shared libraries) ############################
set(SERVO_COMPONENT_NODE servo_node)
set(SERVO_CONTROLLER_INPUT servo_controller_input)
# Executable Nodes ##############################################
set(SERVO_NODE_MAIN_NAME servo_node_main)
set(POSE_TRACKING_DEMO_NAME servo_pose_tracking_demo)
set(FAKE_SERVO_CMDS_NAME fake_command_publisher)
#################################################################
# Common cmake code applied to all moveit packages
find_package(moveit_common REQUIRED)
moveit_package()
set(THIS_PACKAGE_INCLUDE_DEPENDS
control_msgs
control_toolbox
geometry_msgs
moveit_core
moveit_msgs
moveit_ros_planning
pluginlib
rclcpp
rclcpp_components
sensor_msgs
std_msgs
std_srvs
tf2_eigen
trajectory_msgs
)
find_package(ament_cmake REQUIRED)
find_package(eigen3_cmake_module REQUIRED)
find_package(Eigen3 REQUIRED)
foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()
#####################
## Component Nodes ##
#####################
# Add executable for using a controller
add_library(${SERVO_CONTROLLER_INPUT} SHARED src/joystick_twist.cpp)
ament_target_dependencies(${SERVO_CONTROLLER_INPUT} ${THIS_PACKAGE_INCLUDE_DEPENDS})
rclcpp_components_register_nodes(${SERVO_CONTROLLER_INPUT} "servo_arm_twist_pkg::JoyToServoPub")
#############
## Install ##
#############
# Install Libraries
install(
TARGETS
${SERVO_CONTROLLER_INPUT}
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
)
# Install Binaries
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(${THIS_PACKAGE_INCLUDE_DEPENDS})
ament_package()

View File

@@ -1,3 +0,0 @@
# Moveit Servo
See the [Realtime Arm Servoing Tutorial](https://moveit.picknik.ai/main/doc/realtime_servo/realtime_servo_tutorial.html) for installation instructions, quick-start guide, an overview about `moveit_servo`, and to learn how to set it up on your robot.

View File

@@ -1,58 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>servo_arm_twist_pkg</name>
<version>2.5.9</version>
<description>Provides real-time manipulator Cartesian and joint servoing.</description>
<maintainer email="blakeanderson@utexas.edu">Blake Anderson</maintainer>
<maintainer email="andyz@utexas.edu">Andy Zelenak</maintainer>
<maintainer email="tyler@picknik.ai">Tyler Weaver</maintainer>
<maintainer email="henningkayser@picknik.ai">Henning Kayser</maintainer>
<license>BSD 3-Clause</license>
<url type="website">https://ros-planning.github.io/moveit_tutorials</url>
<author>Brian O'Neil</author>
<author email="andyz@utexas.edu">Andy Zelenak</author>
<author>Blake Anderson</author>
<author email="alex@machinekoder.com">Alexander Rössler</author>
<author email="tyler@picknik.ai">Tyler Weaver</author>
<author email="adam.pettinger@utexas.edu">Adam Pettinger</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>moveit_common</depend>
<depend>control_msgs</depend>
<depend>control_toolbox</depend>
<depend>geometry_msgs</depend>
<depend>moveit_msgs</depend>
<depend>moveit_core</depend>
<depend>moveit_ros_planning_interface</depend>
<depend>pluginlib</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>tf2_eigen</depend>
<depend>trajectory_msgs</depend>
<exec_depend>gripper_controllers</exec_depend>
<exec_depend>joint_state_broadcaster</exec_depend>
<exec_depend>joint_trajectory_controller</exec_depend>
<exec_depend>joy</exec_depend>
<exec_depend>robot_state_publisher</exec_depend>
<exec_depend>tf2_ros</exec_depend>
<exec_depend>moveit_configs_utils</exec_depend>
<exec_depend>launch_param_builder</exec_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>controller_manager</test_depend>
<test_depend>ros_testing</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -1,271 +0,0 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2020, PickNik Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of PickNik Inc. nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
/* Title : joystick_servo_example.cpp
* Project : servo_arm_twist_pkg
* Created : 08/07/2020
* Author : Adam Pettinger
*/
#include <sensor_msgs/msg/joy.hpp>
#include <geometry_msgs/msg/twist_stamped.hpp>
#include <control_msgs/msg/joint_jog.hpp>
#include <std_srvs/srv/trigger.hpp>
#include <moveit_msgs/msg/planning_scene.hpp>
#include <rclcpp/client.hpp>
#include <rclcpp/experimental/buffers/intra_process_buffer.hpp>
#include <rclcpp/node.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/qos.hpp>
#include <rclcpp/qos_event.hpp>
#include <rclcpp/subscription.hpp>
#include <rclcpp/time.hpp>
#include <rclcpp/utilities.hpp>
#include <thread>
// We'll just set up parameters here
const std::string JOY_TOPIC = "/joy";
const std::string TWIST_TOPIC = "/servo_node/delta_twist_cmds";
const std::string JOINT_TOPIC = "/servo_node/delta_joint_cmds";
const std::string EEF_FRAME_ID = "End_Effector";
const std::string BASE_FRAME_ID = "base_link";
// Enums for button names -> axis/button array index
// For XBOX 1 controller
enum Axis
{
LEFT_STICK_X = 0,
LEFT_STICK_Y = 1,
LEFT_TRIGGER = 2,
RIGHT_STICK_X = 3,
RIGHT_STICK_Y = 4,
RIGHT_TRIGGER = 5,
D_PAD_X = 6,
D_PAD_Y = 7
};
enum Button
{
A = 0,
B = 1,
X = 2,
Y = 3,
LEFT_BUMPER = 4,
RIGHT_BUMPER = 5,
CHANGE_VIEW = 6,
MENU = 7,
HOME = 8,
LEFT_STICK_CLICK = 9,
RIGHT_STICK_CLICK = 10
};
// Some axes have offsets (e.g. the default trigger position is 1.0 not 0)
// This will map the default values for the axes
std::map<Axis, double> AXIS_DEFAULTS = { { LEFT_TRIGGER, 1.0 }, { RIGHT_TRIGGER, 1.0 } };
std::map<Button, double> BUTTON_DEFAULTS;
// To change controls or setup a new controller, all you should to do is change the above enums and the follow 2
// functions
/** \brief // This converts a joystick axes and buttons array to a TwistStamped or JointJog message
* @param axes The vector of continuous controller joystick axes
* @param buttons The vector of discrete controller button values
* @param twist A TwistStamped message to update in prep for publishing
* @param joint A JointJog message to update in prep for publishing
* @return return true if you want to publish a Twist, false if you want to publish a JointJog
*/
bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& buttons,
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist,
std::unique_ptr<control_msgs::msg::JointJog>& joint)
{
// Give joint jogging priority because it is only buttons
// If any joint jog command is requested, we are only publishing joint commands
if (0)
{
// Map the D_PAD to the proximal joints
joint->joint_names.push_back("panda_joint1");
joint->velocities.push_back(axes[D_PAD_X]);
joint->joint_names.push_back("panda_joint2");
joint->velocities.push_back(axes[D_PAD_Y]);
// Map the diamond to the distal joints
joint->joint_names.push_back("panda_joint7");
joint->velocities.push_back(buttons[B] - buttons[X]);
joint->joint_names.push_back("panda_joint6");
joint->velocities.push_back(buttons[Y] - buttons[A]);
return false;
}
// The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y];
twist->twist.linear.y = axes[RIGHT_STICK_X];
double lin_x_right = -0.5 * (axes[RIGHT_TRIGGER] - AXIS_DEFAULTS.at(RIGHT_TRIGGER));
double lin_x_left = 0.5 * (axes[LEFT_TRIGGER] - AXIS_DEFAULTS.at(LEFT_TRIGGER));
twist->twist.linear.x = lin_x_right + lin_x_left;
twist->twist.angular.y = axes[LEFT_STICK_Y];
twist->twist.angular.x = axes[LEFT_STICK_X];
double roll_positive = buttons[RIGHT_BUMPER];
double roll_negative = -1 * (buttons[LEFT_BUMPER]);
twist->twist.angular.z = roll_positive + roll_negative;
return true;
}
/** \brief // This should update the frame_to_publish_ as needed for changing command frame via controller
* @param frame_name Set the command frame to this
* @param buttons The vector of discrete controller button values
*/
void updateCmdFrame(std::string& frame_name, const std::vector<int>& buttons)
{
if (buttons[CHANGE_VIEW] && frame_name == EEF_FRAME_ID)
frame_name = BASE_FRAME_ID;
else if (buttons[MENU] && frame_name == BASE_FRAME_ID)
frame_name = EEF_FRAME_ID;
}
namespace servo_arm_twist_pkg
{
class JoyToServoPub : public rclcpp::Node
{
public:
JoyToServoPub(const rclcpp::NodeOptions& options)
: Node("joy_to_twist_publisher", options), frame_to_publish_(BASE_FRAME_ID)
{
// Setup pub/sub
joy_sub_ = this->create_subscription<sensor_msgs::msg::Joy>(
JOY_TOPIC, rclcpp::SystemDefaultsQoS(),
[this](const sensor_msgs::msg::Joy::ConstSharedPtr& msg) { return joyCB(msg); });
twist_pub_ = this->create_publisher<geometry_msgs::msg::TwistStamped>(TWIST_TOPIC, rclcpp::SystemDefaultsQoS());
joint_pub_ = this->create_publisher<control_msgs::msg::JointJog>(JOINT_TOPIC, rclcpp::SystemDefaultsQoS());
// collision_pub_ =
// this->create_publisher<moveit_msgs::msg::PlanningScene>("/planning_scene", rclcpp::SystemDefaultsQoS());
// Create a service client to start the ServoNode
servo_start_client_ = this->create_client<std_srvs::srv::Trigger>("/servo_node/start_servo");
servo_start_client_->wait_for_service(std::chrono::seconds(1));
servo_start_client_->async_send_request(std::make_shared<std_srvs::srv::Trigger::Request>());
// // Load the collision scene asynchronously
// collision_pub_thread_ = std::thread([this]() {
// rclcpp::sleep_for(std::chrono::seconds(3));
// // Create collision object, in the way of servoing
// moveit_msgs::msg::CollisionObject collision_object;
// collision_object.header.frame_id = "panda_link0";
// collision_object.id = "box";
// shape_msgs::msg::SolidPrimitive table_1;
// table_1.type = table_1.BOX;
// table_1.dimensions = { 0.4, 0.6, 0.03 };
// geometry_msgs::msg::Pose table_1_pose;
// table_1_pose.position.x = 0.6;
// table_1_pose.position.y = 0.0;
// table_1_pose.position.z = 0.4;
// shape_msgs::msg::SolidPrimitive table_2;
// table_2.type = table_2.BOX;
// table_2.dimensions = { 0.6, 0.4, 0.03 };
// geometry_msgs::msg::Pose table_2_pose;
// table_2_pose.position.x = 0.0;
// table_2_pose.position.y = 0.5;
// table_2_pose.position.z = 0.25;
// collision_object.primitives.push_back(table_1);
// collision_object.primitive_poses.push_back(table_1_pose);
// collision_object.primitives.push_back(table_2);
// collision_object.primitive_poses.push_back(table_2_pose);
// collision_object.operation = collision_object.ADD;
// moveit_msgs::msg::PlanningSceneWorld psw;
// psw.collision_objects.push_back(collision_object);
// auto ps = std::make_unique<moveit_msgs::msg::PlanningScene>();
// ps->world = psw;
// ps->is_diff = true;
// collision_pub_->publish(std::move(ps));
// });
}
// ~JoyToServoPub() override
// {
// if (collision_pub_thread_.joinable())
// collision_pub_thread_.join();
// }
void joyCB(const sensor_msgs::msg::Joy::ConstSharedPtr& msg)
{
// Create the messages we might publish
auto twist_msg = std::make_unique<geometry_msgs::msg::TwistStamped>();
auto joint_msg = std::make_unique<control_msgs::msg::JointJog>();
// This call updates the frame for twist commands
updateCmdFrame(frame_to_publish_, msg->buttons);
// Convert the joystick message to Twist or JointJog and publish
if (convertJoyToCmd(msg->axes, msg->buttons, twist_msg, joint_msg))
{
// publish the TwistStamped
twist_msg->header.frame_id = frame_to_publish_;
twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg));
}
else
{
// publish the JointJog
joint_msg->header.stamp = this->now();
joint_msg->header.frame_id = "panda_link3";
joint_pub_->publish(std::move(joint_msg));
}
}
private:
rclcpp::Subscription<sensor_msgs::msg::Joy>::SharedPtr joy_sub_;
rclcpp::Publisher<geometry_msgs::msg::TwistStamped>::SharedPtr twist_pub_;
rclcpp::Publisher<control_msgs::msg::JointJog>::SharedPtr joint_pub_;
rclcpp::Publisher<moveit_msgs::msg::PlanningScene>::SharedPtr collision_pub_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr servo_start_client_;
std::string frame_to_publish_;
// std::thread collision_pub_thread_;
}; // class JoyToServoPub
} // namespace servo_arm_twist_pkg
// Register the component with class_loader
#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(servo_arm_twist_pkg::JoyToServoPub)