19 Commits

Author SHA1 Message Date
David
e53c1f32c9 chore: bump astra_msgs submodule 2026-04-08 01:21:20 -05:00
David
d0f6ecf702 Merge remote-tracking branch 'origin/main' into core-ros2-control 2026-04-08 01:15:16 -05:00
ryleu
2213896494 change is_testbed to rover_platform 2026-03-31 23:01:06 -05:00
David
c42cd39fda feat(arm): implement ArmCtrlState topic 2026-03-26 12:40:27 -05:00
David
f1c84c3cc5 fix(arm): correctly control arm 2026-03-26 03:53:38 -05:00
David
cf699da0c6 fix(arm): populate missing feedback fields 2026-03-26 02:40:58 -05:00
David
7669ded344 feat(core): add code support for testbed
Adds parameter `is_testbed`. WIP: still needs support in astra_descriptions; new parameter will not be usable until then. When using an incorrect parameter value, odom will be incorrect and will drive at a scaled speed from requested over cmd_vel.
2026-03-26 02:05:37 -05:00
David
ec12a083f1 feat(core): remove bypass /cmd_vel topic 2026-03-26 01:37:57 -05:00
David
ea36ce6ef4 fix(core): populate missing values in /core/feedback_new 2026-03-26 00:40:11 -05:00
David
3f795bf8ed chore(core): remove core_headless 2026-03-24 15:44:30 -05:00
David
b257dc7556 fix: update ros2 plumbing files 2026-03-24 15:41:00 -05:00
David
45825189a5 fix(core): populate CoreFeedback Header stamp 2026-03-23 21:57:48 -05:00
David
a17060ceda refactor(core): cleanup to match arm 2026-03-20 10:42:18 -05:00
David
ff4a58e6ed refactor: (headless) finish integrating Core cmd_vel 2026-03-19 20:02:37 -05:00
David
120891c8e5 chore: update astra_msgs to main 2026-03-19 19:49:25 -05:00
David
178d5001d6 Merge remote-tracking branch 'origin/main' into core-ros2-control 2026-03-19 19:45:59 -05:00
David Sharpe
684c2994ec cmd_vel headless 2026-03-18 01:17:00 -05:00
David
bfa50e3a25 feat: (core) make compatible with ros2_control 2026-02-26 17:17:05 -06:00
David
90fbbac813 feat: add ros2 control option to main launch for core 2026-02-26 17:17:02 -06:00
22 changed files with 894 additions and 857 deletions

View File

@@ -1,5 +1,4 @@
from warnings import deprecated
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
@@ -18,9 +17,8 @@ from .connector import (
from .convert import string_to_viccan
import threading
from builtin_interfaces.msg import Time
from astra_msgs.msg import VicCAN
from std_msgs.msg import String
from astra_msgs.msg import VicCAN, McuVersion
class Anchor(Node):
@@ -49,8 +47,6 @@ class Anchor(Node):
Instead, it converts them to VicCAN messages first.
"""
ASTRA_EPOCH = time.struct_time((2022, 1, 1, 0, 0, 0, 0, 0, 0)) # January 1, 2022
connector: Connector
def __init__(self):
@@ -162,12 +158,6 @@ class Anchor(Node):
"/anchor/to_vic/debug",
20,
)
# MCU Version publisher
self.mcu_version_pub_ = self.create_publisher(
McuVersion,
"/anchor/from_vic/mcu_version",
20,
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
@@ -183,7 +173,7 @@ class Anchor(Node):
20,
)
self.mock_mcu_sub_ = self.create_subscription(
VicCAN,
String,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
@@ -195,8 +185,6 @@ class Anchor(Node):
20,
)
self.mcu_versions: dict[str, McuVersion] = {}
# Close devices on exit
atexit.register(self.cleanup)
@@ -243,26 +231,6 @@ class Anchor(Node):
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
# MCU Versioning information
if msg.command_id in (46, 47) and msg.mcu_name not in self.mcu_versions:
self.mcu_versions[msg.mcu_name] = McuVersion(mcu_name=msg.mcu_name)
if msg.command_id == 46: # commit hashes
self.mcu_versions[msg.mcu_name].project_commit_fragment = msg.data[0]
self.mcu_versions[msg.mcu_name].astra_lib_commit_fragment = msg.data[1]
elif msg.command_id == 47: # build timestamp and version numbers
version_msg = self.mcu_versions[msg.mcu_name]
version_msg.build_time = Time(
sec=int(time.mktime(self.ASTRA_EPOCH) + msg.data[0])
)
# is_main and is_dirty is in msg.data[1]
# Out of 1 byte, it looks like [lib_isdirty][lib_ismain][proj_isdirty][proj_ismain]
version_msg.astra_lib_is_dirty = bool(int(msg.data[1]) >> 3 & 0x1)
version_msg.astra_lib_is_main = bool(int(msg.data[1]) >> 2 & 0x1)
version_msg.project_is_dirty = bool(int(msg.data[1]) >> 1 & 0x1)
version_msg.project_is_main = bool(int(msg.data[1]) & 0x1)
self.mcu_version_pub_.publish(version_msg)
def main(args=None):
try:

View File

@@ -1,8 +1,10 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, Shutdown
from launch.actions import DeclareLaunchArgument, Shutdown, IncludeLaunchDescription
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.launch_description_sources import PythonLaunchDescriptionSource
def generate_launch_description():
@@ -46,50 +48,24 @@ def generate_launch_description():
)
)
ld.add_action(
DeclareLaunchArgument(
"use_ros2_control",
default_value="false",
description="Whether to use DiffDriveController for driving instead of direct Twist",
)
)
ld.add_action(
DeclareLaunchArgument(
"rover_platform",
default_value="auto",
description="Choose the rover platform (either clucky or testbed). If left on auto, will defer to ROVER_PLATFORM environment variable.",
choices=["clucky", "testbed", "auto"],
)
)
# nodes
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="anchor_pkg",
@@ -108,4 +84,91 @@ def generate_launch_description():
)
)
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[
{"launch_mode": "anchor"},
{
"use_ros2_control": LaunchConfiguration(
"use_ros2_control", default=False
)
},
{
"rover_platform": LaunchConfiguration(
"rover_platform", default="auto"
)
},
],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
return ld

View File

@@ -3,14 +3,20 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>Anchor -- ROS and CAN relay node</description>
<description>ASTRA VicCAN driver package, using python-can and pyserial.</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-serial</exec_depend>
<exec_depend>python3-can</exec_depend>
<exec_depend>core_pkg</exec_depend>
<exec_depend>arm_pkg</exec_depend>
<exec_depend>bio_pkg</exec_depend>
<exec_depend>core_description</exec_depend>
<build_depend>black</build_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/anchor_pkg
[install]
install_scripts=$base/lib/anchor_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -6,7 +6,7 @@ package_name = "anchor_pkg"
setup(
name=package_name,
version="0.0.0",
version="1.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -17,8 +17,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN",
license="All Rights Reserved",
description="ASTRA VicCAN driver package, using python-can and pyserial.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
},

View File

@@ -1,5 +1,4 @@
import sys
import threading
import signal
import math
from warnings import deprecated
@@ -13,7 +12,7 @@ from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -26,8 +25,6 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None
class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
@@ -81,7 +78,9 @@ class ArmNode(Node):
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.anchor_pub = self.create_publisher(
String, "/anchor/to_vic/relay_string", 10
)
# Create publishers
self.socket_pub = self.create_publisher(
@@ -115,10 +114,10 @@ class ArmNode(Node):
# Control
# Manual: /arm/manual/joint_jog is published by Basestation or Headless
# Manual: /arm/control/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/manual/joint_jog",
"/arm/control/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
@@ -129,6 +128,13 @@ class ArmNode(Node):
self.joint_command_callback,
qos_profile=control_qos,
)
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
)
# Feedback
@@ -148,9 +154,14 @@ class ArmNode(Node):
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
self.arm_feedback_new.axis0_motor.id = 1
self.arm_feedback_new.axis1_motor.id = 2
self.arm_feedback_new.axis2_motor.id = 3
self.arm_feedback_new.axis3_motor.id = 4
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.header.frame_id = "base_link"
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
@@ -173,10 +184,54 @@ class ArmNode(Node):
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.send_velocities(velocities, msg.header)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=39,
data=velocities[0:4],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=39,
data=velocities[4:6],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=26,
data=[velocities[6]],
header=msg.header,
)
)
# TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
@@ -202,12 +257,12 @@ class ArmNode(Node):
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:4], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
@@ -290,6 +345,8 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
@@ -376,11 +433,14 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.digit_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]

View File

@@ -3,14 +3,15 @@
<package format="3">
<name>arm_pkg</name>
<version>1.0.0</version>
<description>Core arm package which handles ROS2 commnuication.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<description>Relays topics related to Arm between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-numpy</depend>
<depend>astra_msgs</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -12,9 +12,9 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="David",
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
description="Relays topics related to Arm between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": ["arm = arm_pkg.arm_node:main"],

View File

@@ -2,14 +2,16 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>bio_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<version>1.0.0</version>
<description>Biosensor package to handle command interpretation and embedded interfacing.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>astra_msgs</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/bio_pkg
[install]
install_scripts=$base/lib/bio_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,8 +14,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="TODO: Package description",
license="TODO: License declaration",
description="Relays topics related to Biosensor between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": ["bio = bio_pkg.bio_node:main"],
},

View File

@@ -1,112 +0,0 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
import importlib
from std_msgs.msg import String
from astra_msgs.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
max_speed = 90 # Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, "/core/control", 10)
self.lastMsg = (
String()
) # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}"
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -1,20 +1,16 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
from rclpy.duration import Duration
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import os
import sys
import threading
import glob
import signal
from typing import Literal, cast
from scipy.spatial.transform import Rotation
from math import copysign, pi
from warnings import deprecated
from os import getenv
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
from std_msgs.msg import String, Header
from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState
@@ -23,12 +19,14 @@ from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState
from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState
serial_pub = None
thread = None
CORE_WHEELBASE = 0.836 # meters
CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1
# TODO: update core_description or add testbed_description
TESTBED_WHEELBASE = 0.368 # meters
TESTBED_WHEEL_RADIUS = 0.108 # meters
TESTBED_GEAR_RATIO = 64 # Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -41,9 +39,13 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
class CoreNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Core related topics."""
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
48: 1,
49: 1,
50: 2,
@@ -53,24 +55,71 @@ viccan_msg_len_dict = {
54: 4,
56: 4, # really 3, but viccan
58: 4, # ditto
}
}
rover_platform: Literal["core", "testbed"]
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("core_node")
# Launch mode -- anchor vs core
self.declare_parameter("launch_mode", "core")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
self.get_logger().info(f"core launch_mode is: anchor")
##################################################
# Topics
# Parameters
self.declare_parameter("use_ros2_control", False)
self.use_ros2_control = (
self.get_parameter("use_ros2_control").get_parameter_value().bool_value
)
self.declare_parameter("rover_platform", "auto")
rover_platform = (
self.get_parameter("rover_platform").get_parameter_value().string_value
)
if rover_platform == "auto":
self.get_logger().info(
"rover_platform parameter is unset, falling back to environment variable"
)
rover_platform = getenv("ROVER_PLATFORM", "auto")
if rover_platform not in ("core", "testbed"): # make sure we have a valid value
raise ValueError("rover platform must be either 'core' or 'testbed'.")
else:
self.rover_platform = cast(Literal["core", "testbed"], rover_platform)
if self.rover_platform == "testbed":
global TESTBED_WHEELBASE, TESTBED_WHEEL_RADIUS, TESTBED_GEAR_RATIO
self.wheelbase = TESTBED_WHEELBASE
self.wheel_radius = TESTBED_WHEEL_RADIUS
self.gear_ratio = TESTBED_GEAR_RATIO
else: # default in case of unset or invalid environment variable
global CORE_WHEELBASE, CORE_WHEEL_RADIUS, CORE_GEAR_RATIO
self.wheelbase = CORE_WHEELBASE
self.wheel_radius = CORE_WHEEL_RADIUS
self.gear_ratio = CORE_GEAR_RATIO
##################################################
# Old Topics
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
if not self.use_ros2_control:
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
##################################################
# New Topics
# Anchor
if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20
)
@@ -78,18 +127,16 @@ class SerialRelay(Node):
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
self.cmd_vel_sub_ = self.create_subscription(
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1
if self.use_ros2_control:
# Joint state control for topic-based controller
self.joint_command_sub_ = self.create_subscription(
JointState, "/core/joint_commands", self.joint_command_callback, 2
)
else:
# manual twist -- [-1, 1] rather than real units
# TODO: change topic to '/core/control/twist'
self.twist_man_sub_ = self.create_subscription(
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
)
@@ -100,148 +147,64 @@ class SerialRelay(Node):
self.control_state_callback,
qos_profile=control_qos,
)
self.twist_max_duty = (
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
)
self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
# Feedback
# Consolidated and organized core feedback
# Consolidated and organized main core feedback
# TODO: change topic to something like '/core/feedback/main'
self.feedback_new_pub_ = self.create_publisher(
NewCoreFeedback,
"/core/feedback_new",
qos_profile=qos.qos_profile_sensor_data,
)
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/feedback/baro", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Main Core feedback
self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4
self.telemetry_pub_timer = self.create_timer(
1.0, self.publish_feedback
) # TODO: not sure about this
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
# IMU
self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055"
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
# GPS
self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data
)
# Barometer
self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388"
# Old
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
# Debug
self.debug_pub = self.create_publisher(String, "/core/debug", 10)
self.ping_service = self.create_service(
Empty, "/astra/core/ping", self.ping_callback
)
##################################################
# Find microcontroller (Non-anchor only)
# Core (non-anchor) specific
if self.launch_mode == "core":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
self.get_logger().info(f"Enabling Relay Mode")
ser.write(b"can_relay_mode,on\n")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
# end __init__()
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
if self.launch_mode == "core":
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): # NON-ANCHOR SPECIFIC
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
print(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
@deprecated("Uses an old message type. Will be removed at some point.")
def scale_duty(self, value: float, max_speed: float):
leftMin = -1
leftMax = 1
@@ -258,6 +221,7 @@ class SerialRelay(Node):
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
@deprecated("Uses an old message type. Will be removed at some point.")
def send_controls(self, msg: CoreControl):
if msg.turn_to_enable:
command = (
@@ -283,17 +247,42 @@ class SerialRelay(Node):
# print(f"[Sys] Relaying: {command}")
def cmd_vel_callback(self, msg: TwistStamped):
linear = msg.twist.linear.x
angular = -msg.twist.angular.z
def joint_command_callback(self, msg: JointState):
# So... topic based control node publishes JointState messages over /joint_commands
# with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints
# are included in msg.name, but ig it is implied that msg.velocity only
# includes velocities for the commanded joints (ros__parameters.joints).
# So, this will be much more hacky and less adaptable than I would like it to be.
if (
len(msg.name) != (4 if self.rover_platform == "testbed" else 5)
or len(msg.velocity) != 4
or len(msg.position) != 0
):
self.get_logger().warning(
f"Received joint control message with unexpected number of joints. Ignoring."
)
return
if msg.name[-4:] != [ # type: ignore
"bl_wheel_joint",
"br_wheel_joint",
"fl_wheel_joint",
"fr_wheel_joint",
]:
self.get_logger().warning(
f"Received joint control message with unexpected name[]. Ignoring."
)
return
vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
(bl_vel, br_vel, fl_vel, fr_vel) = msg.velocity
vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
bl_rpm = radps_to_rpm(bl_vel) * self.gear_ratio
br_rpm = radps_to_rpm(br_vel) * self.gear_ratio
fl_rpm = radps_to_rpm(fl_vel) * self.gear_ratio
fr_rpm = radps_to_rpm(fr_vel) * self.gear_ratio
self.send_viccan(20, [vel_left_rpm, vel_right_rpm])
self.send_viccan(
20, [fl_rpm, bl_rpm, fr_rpm, br_rpm]
) # order expected by embedded
def twist_man_callback(self, msg: Twist):
linear = msg.linear.x # [-1 1] for forward/back from left stick y
@@ -334,15 +323,9 @@ class SerialRelay(Node):
# Max duty cycle
self.twist_max_duty = msg.max_duty # twist_man_callback will handle this
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str):
if self.launch_mode == "anchor":
# self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String() # Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "core":
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
self.anchor_pub.publish(String(data=msg)) # Publish to anchor for relay
def send_viccan(self, cmd_id: int, data: list[float]):
self.anchor_tovic_pub_.publish(
@@ -354,6 +337,7 @@ class SerialRelay(Node):
)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
@@ -423,14 +407,16 @@ class SerialRelay(Node):
# skill diff if not
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id]
if msg.command_id in self.viccan_msg_len_dict:
expected_len = self.viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
self.feedback_new_state.header.stamp = msg.header.stamp
match msg.command_id:
# GNSS
case 48: # GNSS Latitude
@@ -457,6 +443,7 @@ class SerialRelay(Node):
self.imu_state.linear_acceleration.x = float(msg.data[0])
self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2])
self.feedback_new_state.orientation = float(msg.data[3])
# Deal with quaternion
r = Rotation.from_euler("z", float(msg.data[3]), degrees=True)
q = r.as_quat()
@@ -501,6 +488,7 @@ class SerialRelay(Node):
self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0
self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0
self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0
self.feedback_new_state.board_voltage.header.stamp = msg.header.stamp
# Baro
case 56: # BMP temperature, altitude, pressure
self.baro_state.temperature = float(msg.data[0])
@@ -513,14 +501,12 @@ class SerialRelay(Node):
motorId = round(float(msg.data[0]))
position = float(msg.data[1])
velocity = float(msg.data[2])
joint_state_msg = (
JointState()
) # TODO: not sure if all motors should be in each message or not
joint_state_msg = JointState()
joint_state_msg.position = [
position * (2 * pi) / CORE_GEAR_RATIO
position * (2 * pi) / self.gear_ratio
] # revolutions to radians
joint_state_msg.velocity = [
velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO
velocity * (2 * pi / 60.0) / self.gear_ratio
] # RPM to rad/s
motor: RevMotorState | None = None
@@ -528,52 +514,42 @@ class SerialRelay(Node):
match motorId:
case 1:
motor = self.feedback_new_state.fl_motor
joint_state_msg.name = ["fl_motor_joint"]
joint_state_msg.name = ["fl_wheel_joint"]
case 2:
motor = self.feedback_new_state.bl_motor
joint_state_msg.name = ["bl_motor_joint"]
joint_state_msg.name = ["bl_wheel_joint"]
case 3:
motor = self.feedback_new_state.fr_motor
joint_state_msg.name = ["fr_motor_joint"]
joint_state_msg.name = ["fr_wheel_joint"]
case 4:
motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_motor_joint"]
joint_state_msg.name = ["br_wheel_joint"]
case _:
self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
)
return
if motor:
motor.position = position
motor.velocity = velocity
# make the fucking shit work
if self.rover_platform == "clucky":
joint_state_msg.name.append("left_suspension_joint")
joint_state_msg.position.append(0.0)
joint_state_msg.velocity.append(0.0)
joint_state_msg.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(joint_state_msg)
case _:
return
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self):
# self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback)
def ping_callback(self, request, response):
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def map_range(
value: float, in_min: float, in_max: float, out_min: float, out_max: float
@@ -581,19 +557,31 @@ def map_range(
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def radps_to_rpm(radps: float):
return radps * 60 / (2 * pi)
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
serial_pub = SerialRelay()
serial_pub.run()
core_node = CoreNode()
try:
rclpy.spin(core_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -3,15 +3,17 @@
<package format="3">
<name>core_pkg</name>
<version>1.0.0</version>
<description>Core rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<description>Relays topics related to Core between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-scipy</depend>
<depend>python-crccheck-pip</depend>
<depend>astra_msgs</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<exec_depend>python-crccheck-pip</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/core_pkg
[install]
install_scripts=$base/lib/core_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -4,7 +4,7 @@ package_name = "core_pkg"
setup(
name=package_name,
version="0.0.0",
version="1.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -12,14 +12,13 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Core rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to Core between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": [
"core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main",
],
},

View File

@@ -3,14 +3,15 @@
<package format="3">
<name>headless_pkg</name>
<version>1.0.0</version>
<description>Headless rover control package to handle command interpretation and embedded interfacing.</description>
<description>Provides headless rover control, similar to Basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<depend>common_interfaces</depend>
<depend>python3-pygame</depend>
<depend>astra_msgs</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-pygame</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,3 +2,5 @@
script_dir=$base/lib/headless_pkg
[install]
install_scripts=$base/lib/headless_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,11 +14,9 @@ setup(
zip_safe=True,
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Headless rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
description="Provides headless rover control, similar to Basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": [
"headless_full = src.headless_node:main",
],
"console_scripts": ["headless_full = src.headless_node:main"],
},
)

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState
from astra_msgs.msg import CoreCtrlState, ArmCtrlState
import warnings
@@ -138,6 +138,11 @@ class Headless(Node):
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
self.declare_parameter("use_cmd_vel", False)
self.use_cmd_vel = (
self.get_parameter("use_cmd_vel").get_parameter_value().bool_value
)
self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
@@ -145,7 +150,6 @@ class Headless(Node):
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
)
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
@@ -155,6 +159,13 @@ class Headless(Node):
)
# Check parameter validity
if self.use_cmd_vel:
self.get_logger().info("Using cmd_vel for core control")
global CORE_MODE
CORE_MODE = "twist"
else:
self.get_logger().info("Using astra_msgs/CoreControl for core control")
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
@@ -166,6 +177,8 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
##################################################
# Old Topics
@@ -184,12 +197,18 @@ class Headless(Node):
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_cmd_vel_pub_ = self.create_publisher(
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
@@ -231,11 +250,17 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150)
# Added so you can tell when it starts running after changing the constant logging to debug from info
self.get_logger().info("Defaulting to Core mode. Ready.")
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
if self.use_cmd_vel:
self.core_cmd_vel_pub_.publish(self.core_cmd_vel_stop_msg())
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik:
@@ -332,18 +357,28 @@ class Headless(Node):
self.core_publisher.publish(input)
else: # New topics
input = Twist()
twist = Twist()
# Forward/back and Turn
input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(
twist.linear.x = -1.0 * left_stick_y
twist.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x
) # Exponent for finer control (curve)
# This kinda looks dumb being seperate from the following block, but this
# maintains the separation between modifying the control message and sending it
if self.use_cmd_vel:
twist.linear.x *= 1.5
twist.angular.z *= 0.5
# Publish
self.core_twist_pub_.publish(input)
self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
if self.use_cmd_vel:
header = Header(stamp=self.get_clock().now().to_msg())
self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist))
else:
self.core_twist_pub_.publish(twist)
self.get_logger().debug(
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
)
# Brake mode
@@ -552,13 +587,26 @@ class Headless(Node):
)
# A: brake
# TODO: Brake mode
new_brake_mode = button_a
# Y: linear actuator
# TODO: linear actuator
# X: laser
new_laser = button_x
self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW)
# =============
@@ -643,6 +691,11 @@ class Headless(Node):
else:
pass # TODO: implement new bio control topics
def core_cmd_vel_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),