3 Commits

22 changed files with 352 additions and 872 deletions

1
.envrc
View File

@@ -1,2 +1 @@
use flake use flake
[[ -d install ]] && source install/setup.$(echo $0 | grep -oE '[^/]+$')

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then source /opt/ros/humble/setup.bash
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

16
flake.lock generated
View File

@@ -24,27 +24,27 @@
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": { "locked": {
"lastModified": 1770108954, "lastModified": 1761810010,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=", "narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"owner": "lopsided98", "owner": "lopsided98",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a", "rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "lopsided98", "owner": "lopsided98",
"ref": "develop", "ref": "master",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"type": "github" "type": "github"
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1759381078, "lastModified": 1744849697,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=", "narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=",
"owner": "NixOS", "owner": "lopsided98",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee", "rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment for ASTRA Anchor"; description = "Development environment for ASTRA Anchor";
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
}; };
@@ -18,6 +18,12 @@
pkgs = import nixpkgs { pkgs = import nixpkgs {
inherit system; inherit system;
overlays = [ nix-ros-overlay.overlays.default ]; overlays = [ nix-ros-overlay.overlays.default ];
config = {
# Allow the insecure freeimage package
permittedInsecurePackages = [
"freeimage-3.18.0-unstable-2024-04-18"
];
};
}; };
in in
{ {
@@ -25,10 +31,9 @@
name = "ASTRA Anchor"; name = "ASTRA Anchor";
packages = with pkgs; [ packages = with pkgs; [
colcon colcon
(python313.withPackages ( (python312.withPackages (
p: with p; [ p: with p; [
pyserial pyserial
python-can
pygame pygame
scipy scipy
crccheck crccheck
@@ -36,7 +41,7 @@
] ]
)) ))
( (
with rosPackages.humble; with rosPackages.jazzy;
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
@@ -57,23 +62,22 @@
control-msgs control-msgs
control-toolbox control-toolbox
moveit-core moveit-core
moveit-planners
moveit-common moveit-common
moveit-msgs moveit-msgs
moveit-ros-planning moveit-ros-planning
moveit-ros-planning-interface moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils moveit-configs-utils
moveit-ros-move-group moveit-ros-move-group
moveit-servo moveit-servo
moveit-simple-controller-manager moveit-simple-controller-manager
topic-based-ros2-control
pilz-industrial-motion-planner pilz-industrial-motion-planner
pick-ik pick-ik
ompl ompl
chomp-motion-planner
joy joy
ros2-controllers ros-gz-sim
chomp-motion-planner ros-gz-bridge
# ros2-controllers nixpkg does not build :(
]; ];
} }
) )
@@ -82,6 +86,10 @@
# Display stuff # Display stuff
export DISPLAY=''${DISPLAY:-:0} export DISPLAY=''${DISPLAY:-:0}
export QT_X11_NO_MITSHM=1 export QT_X11_NO_MITSHM=1
if [[ ! $DIRENV_IN_ENVRC ]]; then
eval "$(${pkgs.python3Packages.argcomplete}/bin/register-python-argcomplete ros2)"
eval "$(${pkgs.python3Packages.argcomplete}/bin/register-python-argcomplete colcon)"
fi
''; '';
}; };
} }

View File

@@ -1,99 +1,85 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException from std_srvs.srv import Empty
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import signal import signal
import time
import atexit import atexit
from .connector import ( import serial
Connector, import os
MockConnector,
SerialConnector,
CANConnector,
NoValidDeviceException,
NoWorkingDeviceException,
)
from .convert import string_to_viccan
import sys import sys
import threading import threading
import glob
from std_msgs.msg import String, Header from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN from astra_msgs.msg import VicCAN
serial_pub = None
thread = None
class Anchor(Node):
"""
Publishers:
* /anchor/from_vic/debug
- Every string received from the MCU is published here for debugging
* /anchor/from_vic/core
- VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
Subscribers: """
* /anchor/from_vic/mock_mcu Publishers:
- For testing without an actual MCU, publish strings here as if they came from an MCU * /anchor/from_vic/debug
* /anchor/to_vic/relay - Every string received from the MCU is published here for debugging
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU * /anchor/from_vic/core
""" - VicCAN messages for Core node
* /anchor/from_vic/arm
- VicCAN messages for Arm node
* /anchor/from_vic/bio
- VicCAN messages for Bio node
connector: Connector Subscribers:
* /anchor/from_vic/mock_mcu
- For testing without an actual MCU, publish strings here as if they came from an MCU
* /anchor/to_vic/relay
- Core, Arm, and Bio publish VicCAN messages to this topic to send to the MCU
* /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
class SerialRelay(Node):
def __init__(self): def __init__(self):
super().__init__("anchor_node") # Initalize node with name
super().__init__("anchor_node") # previously 'serial_publisher'
logger = self.get_logger() # Loop through all serial devices on the computer to check for the MCU
self.port = None
self.declare_parameter( if port_override := os.getenv("PORT_OVERRIDE"):
"connector", self.port = port_override
"auto", ports = SerialRelay.list_serial_ports()
ParameterDescriptor( for i in range(4):
name="connector", if self.port is not None:
description="Declares which MCU connector should be used. Defaults to 'auto'.", break
type=ParameterType.PARAMETER_STRING, for port in ports:
additional_constraints="Must be 'serial', 'can', 'mock', or 'auto'.",
),
)
# Determine which connector to use. Options are Mock, Serial, and CAN
connector_select = (
self.get_parameter("connector").get_parameter_value().string_value
)
match connector_select:
case "serial":
logger.info("using serial connector")
self.connector = SerialConnector(self.get_logger())
case "can":
logger.info("using CAN connector")
self.connector = CANConnector(self.get_logger())
case "mock":
logger.info("using mock connector")
self.connector = MockConnector(self.get_logger())
case "auto":
logger.info("automatically determining connector")
try: try:
logger.info("trying CAN connector") # connect and send a ping command
self.connector = CANConnector(self.get_logger()) ser = serial.Serial(port, 115200, timeout=1)
except (NoValidDeviceException, NoWorkingDeviceException, TypeError): # (f"Checking port {port}...")
logger.info("CAN connector failed, trying serial connector") ser.write(b"ping\n")
self.connector = SerialConnector(self.get_logger()) response = ser.read_until(bytes("\n", "utf8"))
case _:
self.get_logger().fatal(
f"invalid value for connector parameter: {connector_select}"
)
exit(1)
# Close devices on exit # if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
self.get_logger().info(f"Enabling Relay Mode")
self.ser.write(b"can_relay_mode,on\n")
atexit.register(self.cleanup) atexit.register(self.cleanup)
# ROS2 Topic Setup # New pub/sub with VicCAN
# Publishers
self.fromvic_debug_pub_ = self.create_publisher( self.fromvic_debug_pub_ = self.create_publisher(
String, "/anchor/from_vic/debug", 20 String, "/anchor/from_vic/debug", 20
) )
@@ -107,66 +93,189 @@ class Anchor(Node):
VicCAN, "/anchor/from_vic/bio", 20 VicCAN, "/anchor/from_vic/bio", 20
) )
# Subscribers
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.connector.write, 20
)
self.mock_mcu_sub_ = self.create_subscription( self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20 String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
) )
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20
)
self.tovic_debug_sub_ = self.create_subscription(
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20
)
def cleanup(self): # Create publishers
self.connector.cleanup() self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10)
self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
# Create a subscriber
self.relay_sub = self.create_subscription(
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" """Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output = self.connector.read() try:
output = str(self.ser.readline(), "utf8")
if not output: if output:
return self.relay_fromvic(output)
# All output over debug temporarily
self.relay_fromvic(output) # self.get_logger().info(f"[MCU] {output}")
msg = String()
def relay_fromvic(self, msg: VicCAN): msg.data = output
"""Relay a string message from the MCU to the appropriate VicCAN topic""" self.debug_pub.publish(msg)
msg.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic") if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
if msg.mcu_name == "core": elif output.startswith("can_relay_fromvic,arm") or output.startswith(
self.fromvic_core_pub_.publish(msg) "can_relay_fromvic,digit"
elif msg.mcu_name == "arm" or msg.mcu_name == "digit": ): # digit for voltage readings
self.fromvic_arm_pub_.publish(msg) self.arm_pub.publish(msg)
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit": if output.startswith("can_relay_fromvic,citadel") or output.startswith(
self.fromvic_bio_pub_.publish(msg) "can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def on_mock_fromvic(self, msg: String): def on_mock_fromvic(self, msg: String):
viccan = string_to_viccan( """For testing without an actual MCU, publish strings here as if they came from an MCU"""
msg.data, # self.get_logger().info(f"Got command from mock MCU: {msg}")
"mock", self.relay_fromvic(msg.data)
self.get_logger(),
def on_relay_tovic_viccan(self, msg: VicCAN):
"""Relay a VicCAN message to the MCU"""
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
for num in msg.data:
output += f",{round(num, 7)}" # limit to 7 decimal places
output += "\n"
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
self.ser.write(bytes(output, "utf8"))
def relay_fromvic(self, msg: str):
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
self.fromvic_debug_pub_.publish(String(data=msg))
parts = msg.strip().split(",")
if len(parts) > 0 and parts[0] != "can_relay_fromvic":
self.get_logger().debug(f"Ignoring non-VicCAN message: '{msg.strip()}'")
return
# String validation
malformed: bool = False
malformed_reason: str = ""
if len(parts) < 3 or len(parts) > 7:
malformed = True
malformed_reason = (
f"invalid argument count (expected [3,7], got {len(parts)})"
)
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
malformed = True
malformed_reason = f"invalid mcu_name '{parts[1]}'"
elif not (parts[2].isnumeric()) or int(parts[2]) < 0:
malformed = True
malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer"
else:
for x in parts[3:]:
try:
float(x)
except ValueError:
malformed = True
malformed_reason = f"data '{x}' is not a float"
break
if malformed:
self.get_logger().warning(
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
)
return
# Have valid VicCAN message
output = VicCAN()
output.mcu_name = parts[1]
output.command_id = int(parts[2])
if len(parts) > 3:
output.data = [float(x) for x in parts[3:]]
output.header = Header(
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
) )
if viccan:
self.relay_fromvic(viccan) # self.get_logger().info(f"Relaying from MCU: {output}")
if output.mcu_name == "core":
self.fromvic_core_pub_.publish(output)
elif output.mcu_name == "arm" or output.mcu_name == "digit":
self.fromvic_arm_pub_.publish(output)
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
self.fromvic_bio_pub_.publish(output)
def on_relay_tovic_string(self, msg: String):
"""Relay a raw string message to the MCU for debugging"""
message = msg.data
# self.get_logger().info(f"Sending command to MCU: {msg}")
self.ser.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
try: rclpy.init(args=args)
rclpy.init(args=args) sys.excepthook = myexcepthook
anchor_node = Anchor()
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True) global serial_pub
thread.start()
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate serial_pub = SerialRelay()
while rclpy.ok(): serial_pub.run()
anchor_node.read_MCU() # Check the MCU for updates
rate.sleep()
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal, shutting down...")
finally:
rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal( signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0) signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly ) # Catch termination signals and exit cleanly

View File

@@ -1,189 +0,0 @@
from abc import ABC, abstractmethod
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
from .convert import string_to_viccan
# CAN
import can
import can.interfaces.socketcan
# Serial
import serial
import serial.tools.list_ports
KNOWN_USBS = [
(0x2E8A, 0x00C0), # Raspberry Pi Pico
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
(0x1A86, 0x55D3), # ESP32 S3 Development Board
]
BAUD_RATE = 115200
class NoValidDeviceException(Exception):
pass
class NoWorkingDeviceException(Exception):
pass
class MultipleValidDevicesException(Exception):
pass
class DeviceClosedException(Exception):
pass
class Connector(ABC):
logger: RcutilsLogger
def __init__(self, logger: RcutilsLogger):
self.logger = logger
@abstractmethod
def read(self) -> VicCAN | None:
pass
@abstractmethod
def write(self, msg: VicCAN):
pass
def cleanup(self):
pass
class SerialConnector(Connector):
port: str
mcu_name: str
serial_interface: serial.Serial
override: bool
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
ports = self._find_ports()
if len(ports) <= 0:
raise NoValidDeviceException("no valid serial device found")
if (l := len(ports)) > 1:
raise MultipleValidDevicesException(
f"too many ({l}) valid serial devices found"
)
# check each of our ports to make sure one of them is responding
port = ports[0]
mcu_name = self._get_name(port)
if not mcu_name:
raise NoWorkingDeviceException(
f"found {port}, but it did not respond with its name"
)
self.port = port
self.mcu_name = mcu_name
# if we fail at this point, it should crash because we've already tested the port
self.serial_interface = serial.Serial(self.port, BAUD_RATE, timeout=1)
def _find_ports(self) -> list[str]:
"""
Finds all valid ports but does not test them
returns: all valid ports
"""
comports = serial.tools.list_ports.comports()
valid_ports = list(
map( # get just device strings
lambda p: p.device,
filter( # make sure we have a known device
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
comports,
),
)
)
self.logger.info(f"found valid MCU ports: [ {', '.join(valid_ports)} ]")
return valid_ports
def _get_name(self, port: str) -> str | None:
"""
Get the name of the MCU (if it works)
returns: str name of the MCU, None if it doesn't work
"""
# attempt to open the serial port
serial_interface: serial.Serial
try:
self.logger.info(f"asking {port} for its name")
serial_interface = serial.Serial(port, BAUD_RATE, timeout=1)
serial_interface.write(b"can_relay_mode,on\n")
for i in range(4):
self.logger.debug(f"attempt {i + 1} of 4 asking {port} for its name")
response = serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
self.logger.info(f"we are talking to {args[1]}")
return args[1]
break
except UnicodeDecodeError as e:
self.logger.info(
f"ignoring UnicodeDecodeError when asking for MCU name: {e}"
)
if serial_interface.is_open:
serial_interface.close()
except serial.SerialException as e:
self.logger.error(f"SerialException when asking for MCU name: {e}")
return None
def read(self) -> VicCAN | None:
try:
raw = str(self.serial_interface.readline(), "utf8")
if not raw:
return None
return string_to_viccan(raw, self.mcu_name, self.logger)
except serial.SerialException as e:
self.logger.error(f"SerialException: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except TypeError as e:
self.logger.error(f"TypeError: {e}")
raise DeviceClosedException(f"serial port {self.port} closed unexpectedly")
except Exception:
pass # pretty much no other error matters
def write(self, msg: VicCAN):
# go from [ w, x, y, z ] -> "w,x,y,z" & round to 7 digits max
data = ",".join([str(round(x, 7)) for x in msg.data])
output = f"can_relay_tovic,{msg.mcu_name},{msg.command_id},{data}\n"
self.serial_interface.write(bytes(output, "utf8"))
def cleanup(self):
self.logger.info(f"closing serial port if open {self.port}")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except Exception as e:
self.logger.error(e)
class CANConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
class MockConnector(Connector):
def __init__(self, logger: RcutilsLogger):
super().__init__(logger)
def read(self) -> VicCAN | None:
return None
def write(self, msg: VicCAN):
print(msg)

View File

@@ -1,42 +0,0 @@
from astra_msgs.msg import VicCAN
from rclpy.impl.rcutils_logger import RcutilsLogger
def string_to_viccan(msg: str, mcu_name: str, logger: RcutilsLogger):
"""
Converts the serial string VicCAN format to a ROS2 VicCAN message.
Does not fill out the Header of the message.
On a failure, it will log at a debug level why it failed.
Parameters:
* msg: str
- The message in serial VicCAN format
* mcu_name: str
- The name of the MCU (e.g. core, citadel, arm)
* logger: RcutilsLogger
- A logger retrieved from node.get_logger()
Returns:
* VicCAN | None
- The VicCAN message on a success or None on a failure
"""
parts: list[str] = msg.split(",")
# don't need an extra check because len of .split output is always >= 1
if parts[0] != "can_relay_fromvic":
logger.debug(f"got non-CAN data from {mcu_name}: {msg}")
return None
elif len(parts) < 3:
logger.debug(
f"got garbage (not enough parts) CAN data from {mcu_name}: {msg}"
)
return None
elif len(parts) > 7:
logger.debug(f"got garbage (too many parts) CAN data from {mcu_name}: {msg}")
return None
return VicCAN(
mcu_name=parts[1],
command_id=int(parts[2]),
data=[float(x) for x in parts[3:]],
)

View File

@@ -3,14 +3,13 @@
<package format="3"> <package format="3">
<name>anchor_pkg</name> <name>anchor_pkg</name>
<version>0.0.0</version> <version>0.0.0</version>
<description>Anchor -- ROS and CAN relay node</description> <description>TODO: Package description</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer> <maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license> <license>AGPL-3.0-only</license>
<depend>rclpy</depend> <depend>rclpy</depend>
<depend>common_interfaces</depend> <depend>common_interfaces</depend>
<depend>python3-serial</depend> <depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend> <build_depend>black</build_depend>

View File

@@ -49,7 +49,7 @@ class SerialRelay(Node):
# Create subscribers # Create subscribers
self.man_sub = self.create_subscription( self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 2 ArmManual, "/arm/control/manual", self.send_manual, 10
) )
# New messages # New messages
@@ -190,7 +190,7 @@ class SerialRelay(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n" command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn # command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n" command += f"can_relay_tovic,digit,28,{msg.laser}\n"

View File

@@ -31,14 +31,14 @@ CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1 CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE, durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), liveliness_lease_duration=Duration(seconds=5),
) )
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos from rclpy import qos
from rclpy.duration import Duration from rclpy.duration import Duration
@@ -12,8 +11,6 @@ import os
import sys import sys
import threading import threading
import glob import glob
import pwd
import grp
from math import copysign from math import copysign
from std_msgs.msg import String from std_msgs.msg import String
@@ -35,14 +32,14 @@ ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # " BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE, durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1), deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5), liveliness_lease_duration=Duration(seconds=5),
) )
CORE_MODE = "twist" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
@@ -74,36 +71,9 @@ class Headless(Node):
print("No gamepad found. Waiting...") print("No gamepad found. Waiting...")
# Initialize the gamepad # Initialize the gamepad
id = 0 self.gamepad = pygame.joystick.Joystick(0)
while True: self.gamepad.init()
self.num_gamepads = pygame.joystick.get_count() print(f"Gamepad Found: {self.gamepad.get_name()}")
if id >= self.num_gamepads:
self.get_logger().fatal("Ran out of controllers to try")
sys.exit(1)
try:
self.gamepad = pygame.joystick.Joystick(id)
self.gamepad.init()
except Exception as e:
self.get_logger().error("Error when initializing gamepad")
self.get_logger().error(e)
id += 1
continue
print(f"Gamepad Found: {self.gamepad.get_name()}")
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5:
self.get_logger().error("Controller not correctly initialized.")
if not is_user_in_group("input"):
self.get_logger().warning(
"If using NixOS, you may need to add yourself to the 'input' group."
)
if is_user_in_group("plugdev"):
self.get_logger().warning(
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
)
else:
break
id += 1
self.create_timer(0.15, self.send_controls) self.create_timer(0.15, self.send_controls)
@@ -145,7 +115,7 @@ class Headless(Node):
sys.exit(0) sys.exit(0)
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads: if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Send one last zero control message # Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG) self.core_publisher.publish(CORE_STOP_MSG)
@@ -326,34 +296,11 @@ def deadzone(value: float, threshold=0.05) -> float:
return value return value
def is_user_in_group(group_name: str) -> bool:
# Copied from https://zetcode.com/python/os-getgrouplist/
try:
username = os.getlogin()
# Get group ID from name
group_info = grp.getgrnam(group_name)
target_gid = group_info.gr_gid
# Get user's groups
user_info = pwd.getpwnam(username)
user_groups = os.getgrouplist(username, user_info.pw_gid)
return target_gid in user_groups
except KeyError:
return False
def main(args=None): def main(args=None):
try: rclpy.init(args=args)
rclpy.init(args=args) node = Headless()
rclpy.spin(node)
node = Headless() rclpy.shutdown()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
print("Caught shutdown signal. Exiting...")
finally:
rclpy.shutdown()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.8)
project(moveit_servo)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@@ -0,0 +1,4 @@
panda_arm:
kinematics_solver: kdl_kinematics_plugin/KDLKinematicsPlugin
kinematics_solver_search_resolution: 0.005
kinematics_solver_timeout: 0.05

View File

@@ -0,0 +1,25 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from moveit_configs_utils import MoveItConfigsBuilder
def generate_launch_description():
moveit_config = (
MoveItConfigsBuilder("astra_descriptions")
.robot_description(file_path="config/robot.urdf.xacro")
.to_moveit_configs()
)
servo_node = Node(
package="moveit_servo",
executable="servo_node",
name="servo_node",
parameters=[
"config/servo_parameters.yaml",
moveit_config.robot_description,
moveit_config.robot_description_semantic,
moveit_config.robot_description_kinematics,
],
output="screen",
)
return LaunchDescription([servo_node])

View File

@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>moveit_servo</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="aaron.m.davis@comcast.net">aaron</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -1,80 +0,0 @@
cmake_minimum_required(VERSION 3.22)
project(servo_arm_twist_pkg)
# C++ Libraries #################################################
# Core C++ library for calculations and collision checking.
# Provides interface used by the component node.
set(SERVO_LIB_NAME servo_arm_twist_lib)
# Pose Tracking
set(POSE_TRACKING pose_tracking)
# Component Nodes (Shared libraries) ############################
set(SERVO_COMPONENT_NODE servo_node)
set(SERVO_CONTROLLER_INPUT servo_controller_input)
# Executable Nodes ##############################################
set(SERVO_NODE_MAIN_NAME servo_node_main)
set(POSE_TRACKING_DEMO_NAME servo_pose_tracking_demo)
set(FAKE_SERVO_CMDS_NAME fake_command_publisher)
#################################################################
# Common cmake code applied to all moveit packages
find_package(moveit_common REQUIRED)
moveit_package()
set(THIS_PACKAGE_INCLUDE_DEPENDS
control_msgs
control_toolbox
geometry_msgs
moveit_core
moveit_msgs
moveit_ros_planning
pluginlib
rclcpp
rclcpp_components
sensor_msgs
std_msgs
std_srvs
tf2_eigen
trajectory_msgs
)
find_package(ament_cmake REQUIRED)
find_package(eigen3_cmake_module REQUIRED)
find_package(Eigen3 REQUIRED)
foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()
#####################
## Component Nodes ##
#####################
# Add executable for using a controller
add_library(${SERVO_CONTROLLER_INPUT} SHARED src/joystick_twist.cpp)
ament_target_dependencies(${SERVO_CONTROLLER_INPUT} ${THIS_PACKAGE_INCLUDE_DEPENDS})
rclcpp_components_register_nodes(${SERVO_CONTROLLER_INPUT} "servo_arm_twist_pkg::JoyToServoPub")
#############
## Install ##
#############
# Install Libraries
install(
TARGETS
${SERVO_CONTROLLER_INPUT}
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
)
# Install Binaries
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_dependencies(${THIS_PACKAGE_INCLUDE_DEPENDS})
ament_package()

View File

@@ -1,3 +0,0 @@
# Moveit Servo
See the [Realtime Arm Servoing Tutorial](https://moveit.picknik.ai/main/doc/realtime_servo/realtime_servo_tutorial.html) for installation instructions, quick-start guide, an overview about `moveit_servo`, and to learn how to set it up on your robot.

View File

@@ -1,58 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>servo_arm_twist_pkg</name>
<version>2.5.9</version>
<description>Provides real-time manipulator Cartesian and joint servoing.</description>
<maintainer email="blakeanderson@utexas.edu">Blake Anderson</maintainer>
<maintainer email="andyz@utexas.edu">Andy Zelenak</maintainer>
<maintainer email="tyler@picknik.ai">Tyler Weaver</maintainer>
<maintainer email="henningkayser@picknik.ai">Henning Kayser</maintainer>
<license>BSD 3-Clause</license>
<url type="website">https://ros-planning.github.io/moveit_tutorials</url>
<author>Brian O'Neil</author>
<author email="andyz@utexas.edu">Andy Zelenak</author>
<author>Blake Anderson</author>
<author email="alex@machinekoder.com">Alexander Rössler</author>
<author email="tyler@picknik.ai">Tyler Weaver</author>
<author email="adam.pettinger@utexas.edu">Adam Pettinger</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>moveit_common</depend>
<depend>control_msgs</depend>
<depend>control_toolbox</depend>
<depend>geometry_msgs</depend>
<depend>moveit_msgs</depend>
<depend>moveit_core</depend>
<depend>moveit_ros_planning_interface</depend>
<depend>pluginlib</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>tf2_eigen</depend>
<depend>trajectory_msgs</depend>
<exec_depend>gripper_controllers</exec_depend>
<exec_depend>joint_state_broadcaster</exec_depend>
<exec_depend>joint_trajectory_controller</exec_depend>
<exec_depend>joy</exec_depend>
<exec_depend>robot_state_publisher</exec_depend>
<exec_depend>tf2_ros</exec_depend>
<exec_depend>moveit_configs_utils</exec_depend>
<exec_depend>launch_param_builder</exec_depend>
<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>controller_manager</test_depend>
<test_depend>ros_testing</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -1,271 +0,0 @@
/*********************************************************************
* Software License Agreement (BSD License)
*
* Copyright (c) 2020, PickNik Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
* * Neither the name of PickNik Inc. nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*********************************************************************/
/* Title : joystick_servo_example.cpp
* Project : servo_arm_twist_pkg
* Created : 08/07/2020
* Author : Adam Pettinger
*/
#include <sensor_msgs/msg/joy.hpp>
#include <geometry_msgs/msg/twist_stamped.hpp>
#include <control_msgs/msg/joint_jog.hpp>
#include <std_srvs/srv/trigger.hpp>
#include <moveit_msgs/msg/planning_scene.hpp>
#include <rclcpp/client.hpp>
#include <rclcpp/experimental/buffers/intra_process_buffer.hpp>
#include <rclcpp/node.hpp>
#include <rclcpp/publisher.hpp>
#include <rclcpp/qos.hpp>
#include <rclcpp/qos_event.hpp>
#include <rclcpp/subscription.hpp>
#include <rclcpp/time.hpp>
#include <rclcpp/utilities.hpp>
#include <thread>
// We'll just set up parameters here
const std::string JOY_TOPIC = "/joy";
const std::string TWIST_TOPIC = "/servo_node/delta_twist_cmds";
const std::string JOINT_TOPIC = "/servo_node/delta_joint_cmds";
const std::string EEF_FRAME_ID = "End_Effector";
const std::string BASE_FRAME_ID = "base_link";
// Enums for button names -> axis/button array index
// For XBOX 1 controller
enum Axis
{
LEFT_STICK_X = 0,
LEFT_STICK_Y = 1,
LEFT_TRIGGER = 2,
RIGHT_STICK_X = 3,
RIGHT_STICK_Y = 4,
RIGHT_TRIGGER = 5,
D_PAD_X = 6,
D_PAD_Y = 7
};
enum Button
{
A = 0,
B = 1,
X = 2,
Y = 3,
LEFT_BUMPER = 4,
RIGHT_BUMPER = 5,
CHANGE_VIEW = 6,
MENU = 7,
HOME = 8,
LEFT_STICK_CLICK = 9,
RIGHT_STICK_CLICK = 10
};
// Some axes have offsets (e.g. the default trigger position is 1.0 not 0)
// This will map the default values for the axes
std::map<Axis, double> AXIS_DEFAULTS = { { LEFT_TRIGGER, 1.0 }, { RIGHT_TRIGGER, 1.0 } };
std::map<Button, double> BUTTON_DEFAULTS;
// To change controls or setup a new controller, all you should to do is change the above enums and the follow 2
// functions
/** \brief // This converts a joystick axes and buttons array to a TwistStamped or JointJog message
* @param axes The vector of continuous controller joystick axes
* @param buttons The vector of discrete controller button values
* @param twist A TwistStamped message to update in prep for publishing
* @param joint A JointJog message to update in prep for publishing
* @return return true if you want to publish a Twist, false if you want to publish a JointJog
*/
bool convertJoyToCmd(const std::vector<float>& axes, const std::vector<int>& buttons,
std::unique_ptr<geometry_msgs::msg::TwistStamped>& twist,
std::unique_ptr<control_msgs::msg::JointJog>& joint)
{
// // Give joint jogging priority because it is only buttons
// // If any joint jog command is requested, we are only publishing joint commands
// if (buttons[A] || buttons[B] || buttons[X] || buttons[Y] || axes[D_PAD_X] || axes[D_PAD_Y])
// {
// // Map the D_PAD to the proximal joints
// joint->joint_names.push_back("panda_joint1");
// joint->velocities.push_back(axes[D_PAD_X]);
// joint->joint_names.push_back("panda_joint2");
// joint->velocities.push_back(axes[D_PAD_Y]);
// // Map the diamond to the distal joints
// joint->joint_names.push_back("panda_joint7");
// joint->velocities.push_back(buttons[B] - buttons[X]);
// joint->joint_names.push_back("panda_joint6");
// joint->velocities.push_back(buttons[Y] - buttons[A]);
// return false;
// }
// The bread and butter: map buttons to twist commands
twist->twist.linear.z = axes[RIGHT_STICK_Y];
twist->twist.linear.y = axes[RIGHT_STICK_X];
double lin_x_right = -0.5 * (axes[RIGHT_TRIGGER] - AXIS_DEFAULTS.at(RIGHT_TRIGGER));
double lin_x_left = 0.5 * (axes[LEFT_TRIGGER] - AXIS_DEFAULTS.at(LEFT_TRIGGER));
twist->twist.linear.x = lin_x_right + lin_x_left;
twist->twist.angular.y = axes[LEFT_STICK_Y];
twist->twist.angular.x = axes[LEFT_STICK_X];
double roll_positive = buttons[RIGHT_BUMPER];
double roll_negative = -1 * (buttons[LEFT_BUMPER]);
twist->twist.angular.z = roll_positive + roll_negative;
return true;
}
/** \brief // This should update the frame_to_publish_ as needed for changing command frame via controller
* @param frame_name Set the command frame to this
* @param buttons The vector of discrete controller button values
*/
void updateCmdFrame(std::string& frame_name, const std::vector<int>& buttons)
{
if (buttons[CHANGE_VIEW] && frame_name == EEF_FRAME_ID)
frame_name = BASE_FRAME_ID;
else if (buttons[MENU] && frame_name == BASE_FRAME_ID)
frame_name = EEF_FRAME_ID;
}
namespace servo_arm_twist_pkg
{
class JoyToServoPub : public rclcpp::Node
{
public:
JoyToServoPub(const rclcpp::NodeOptions& options)
: Node("joy_to_twist_publisher", options), frame_to_publish_(BASE_FRAME_ID)
{
// Setup pub/sub
joy_sub_ = this->create_subscription<sensor_msgs::msg::Joy>(
JOY_TOPIC, rclcpp::SystemDefaultsQoS(),
[this](const sensor_msgs::msg::Joy::ConstSharedPtr& msg) { return joyCB(msg); });
twist_pub_ = this->create_publisher<geometry_msgs::msg::TwistStamped>(TWIST_TOPIC, rclcpp::SystemDefaultsQoS());
joint_pub_ = this->create_publisher<control_msgs::msg::JointJog>(JOINT_TOPIC, rclcpp::SystemDefaultsQoS());
// collision_pub_ =
// this->create_publisher<moveit_msgs::msg::PlanningScene>("/planning_scene", rclcpp::SystemDefaultsQoS());
// Create a service client to start the ServoNode
servo_start_client_ = this->create_client<std_srvs::srv::Trigger>("/servo_node/start_servo");
servo_start_client_->wait_for_service(std::chrono::seconds(1));
servo_start_client_->async_send_request(std::make_shared<std_srvs::srv::Trigger::Request>());
// // Load the collision scene asynchronously
// collision_pub_thread_ = std::thread([this]() {
// rclcpp::sleep_for(std::chrono::seconds(3));
// // Create collision object, in the way of servoing
// moveit_msgs::msg::CollisionObject collision_object;
// collision_object.header.frame_id = "panda_link0";
// collision_object.id = "box";
// shape_msgs::msg::SolidPrimitive table_1;
// table_1.type = table_1.BOX;
// table_1.dimensions = { 0.4, 0.6, 0.03 };
// geometry_msgs::msg::Pose table_1_pose;
// table_1_pose.position.x = 0.6;
// table_1_pose.position.y = 0.0;
// table_1_pose.position.z = 0.4;
// shape_msgs::msg::SolidPrimitive table_2;
// table_2.type = table_2.BOX;
// table_2.dimensions = { 0.6, 0.4, 0.03 };
// geometry_msgs::msg::Pose table_2_pose;
// table_2_pose.position.x = 0.0;
// table_2_pose.position.y = 0.5;
// table_2_pose.position.z = 0.25;
// collision_object.primitives.push_back(table_1);
// collision_object.primitive_poses.push_back(table_1_pose);
// collision_object.primitives.push_back(table_2);
// collision_object.primitive_poses.push_back(table_2_pose);
// collision_object.operation = collision_object.ADD;
// moveit_msgs::msg::PlanningSceneWorld psw;
// psw.collision_objects.push_back(collision_object);
// auto ps = std::make_unique<moveit_msgs::msg::PlanningScene>();
// ps->world = psw;
// ps->is_diff = true;
// collision_pub_->publish(std::move(ps));
// });
}
// ~JoyToServoPub() override
// {
// if (collision_pub_thread_.joinable())
// collision_pub_thread_.join();
// }
void joyCB(const sensor_msgs::msg::Joy::ConstSharedPtr& msg)
{
// Create the messages we might publish
auto twist_msg = std::make_unique<geometry_msgs::msg::TwistStamped>();
auto joint_msg = std::make_unique<control_msgs::msg::JointJog>();
// This call updates the frame for twist commands
updateCmdFrame(frame_to_publish_, msg->buttons);
// Convert the joystick message to Twist or JointJog and publish
if (convertJoyToCmd(msg->axes, msg->buttons, twist_msg, joint_msg))
{
// publish the TwistStamped
twist_msg->header.frame_id = frame_to_publish_;
twist_msg->header.stamp = this->now();
twist_pub_->publish(std::move(twist_msg));
}
// else
// {
// // publish the JointJog
// joint_msg->header.stamp = this->now();
// joint_msg->header.frame_id = "panda_link3";
// joint_pub_->publish(std::move(joint_msg));
// }
}
private:
rclcpp::Subscription<sensor_msgs::msg::Joy>::SharedPtr joy_sub_;
rclcpp::Publisher<geometry_msgs::msg::TwistStamped>::SharedPtr twist_pub_;
rclcpp::Publisher<control_msgs::msg::JointJog>::SharedPtr joint_pub_;
rclcpp::Publisher<moveit_msgs::msg::PlanningScene>::SharedPtr collision_pub_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr servo_start_client_;
std::string frame_to_publish_;
// std::thread collision_pub_thread_;
}; // class JoyToServoPub
} // namespace servo_arm_twist_pkg
// Register the component with class_loader
#include <rclcpp_components/register_node_macro.hpp>
RCLCPP_COMPONENTS_REGISTER_NODE(servo_arm_twist_pkg::JoyToServoPub)