3 Commits

Author SHA1 Message Date
David
87237bd841 chore(anchor): use new version commit hash field name 2026-04-18 15:09:30 -05:00
David
79b2d0020f feat(anchor): add is_main and is_dirty to versioning 2026-04-15 23:50:47 -05:00
David
3dd9525833 feat(anchor): add MCU Versioning feedback 2026-04-14 15:38:41 -05:00
22 changed files with 856 additions and 921 deletions

View File

@@ -1,4 +1,5 @@
from warnings import deprecated
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
@@ -17,8 +18,9 @@ from .connector import (
from .convert import string_to_viccan
import threading
from astra_msgs.msg import VicCAN
from builtin_interfaces.msg import Time
from std_msgs.msg import String
from astra_msgs.msg import VicCAN, McuVersion
class Anchor(Node):
@@ -47,6 +49,8 @@ class Anchor(Node):
Instead, it converts them to VicCAN messages first.
"""
ASTRA_EPOCH = time.struct_time((2022, 1, 1, 0, 0, 0, 0, 0, 0)) # January 1, 2022
connector: Connector
def __init__(self):
@@ -158,6 +162,12 @@ class Anchor(Node):
"/anchor/to_vic/debug",
20,
)
# MCU Version publisher
self.mcu_version_pub_ = self.create_publisher(
McuVersion,
"/anchor/from_vic/mcu_version",
20,
)
# Subscribers
self.tovic_sub_ = self.create_subscription(
@@ -173,7 +183,7 @@ class Anchor(Node):
20,
)
self.mock_mcu_sub_ = self.create_subscription(
String,
VicCAN,
"/anchor/from_vic/mock_mcu",
self.relay_fromvic,
20,
@@ -185,6 +195,8 @@ class Anchor(Node):
20,
)
self.mcu_versions: dict[str, McuVersion] = {}
# Close devices on exit
atexit.register(self.cleanup)
@@ -231,6 +243,26 @@ class Anchor(Node):
elif msg.mcu_name == "citadel" or msg.mcu_name == "digit":
self.fromvic_bio_pub_.publish(msg)
# MCU Versioning information
if msg.command_id in (46, 47) and msg.mcu_name not in self.mcu_versions:
self.mcu_versions[msg.mcu_name] = McuVersion(mcu_name=msg.mcu_name)
if msg.command_id == 46: # commit hashes
self.mcu_versions[msg.mcu_name].project_commit_fragment = msg.data[0]
self.mcu_versions[msg.mcu_name].astra_lib_commit_fragment = msg.data[1]
elif msg.command_id == 47: # build timestamp and version numbers
version_msg = self.mcu_versions[msg.mcu_name]
version_msg.build_time = Time(
sec=int(time.mktime(self.ASTRA_EPOCH) + msg.data[0])
)
# is_main and is_dirty is in msg.data[1]
# Out of 1 byte, it looks like [lib_isdirty][lib_ismain][proj_isdirty][proj_ismain]
version_msg.astra_lib_is_dirty = bool(int(msg.data[1]) >> 3 & 0x1)
version_msg.astra_lib_is_main = bool(int(msg.data[1]) >> 2 & 0x1)
version_msg.project_is_dirty = bool(int(msg.data[1]) >> 1 & 0x1)
version_msg.project_is_main = bool(int(msg.data[1]) & 0x1)
self.mcu_version_pub_.publish(version_msg)
def main(args=None):
try:

View File

@@ -1,10 +1,8 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, Shutdown, IncludeLaunchDescription
from launch.actions import DeclareLaunchArgument, Shutdown
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.launch_description_sources import PythonLaunchDescriptionSource
def generate_launch_description():
@@ -48,24 +46,50 @@ def generate_launch_description():
)
)
ld.add_action(
DeclareLaunchArgument(
"use_ros2_control",
default_value="false",
description="Whether to use DiffDriveController for driving instead of direct Twist",
)
)
ld.add_action(
DeclareLaunchArgument(
"rover_platform",
default_value="auto",
description="Choose the rover platform (either clucky or testbed). If left on auto, will defer to ROVER_PLATFORM environment variable.",
choices=["clucky", "testbed", "auto"],
)
)
# nodes
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="anchor_pkg",
@@ -84,91 +108,4 @@ def generate_launch_description():
)
)
ld.add_action(
Node(
package="arm_pkg",
executable="arm",
name="arm",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="bio_pkg",
executable="bio",
name="bio",
output="both",
parameters=[{"launch_mode": "anchor"}],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="core",
name="core",
output="both",
parameters=[
{"launch_mode": "anchor"},
{
"use_ros2_control": LaunchConfiguration(
"use_ros2_control", default=False
)
},
{
"rover_platform": LaunchConfiguration(
"rover_platform", default="auto"
)
},
],
on_exit=Shutdown(),
)
)
ld.add_action(
Node(
package="core_pkg",
executable="ptz",
name="ptz",
output="both",
condition=IfCondition(use_ptz),
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
ld.add_action(
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
)
return ld

View File

@@ -3,20 +3,14 @@
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>ASTRA VicCAN driver package, using python-can and pyserial.</description>
<description>Anchor -- ROS and CAN relay node</description>
<maintainer email="rjm0037@uah.edu">Riley</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-serial</exec_depend>
<exec_depend>python3-can</exec_depend>
<exec_depend>core_pkg</exec_depend>
<exec_depend>arm_pkg</exec_depend>
<exec_depend>bio_pkg</exec_depend>
<exec_depend>core_description</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<depend>python3-can</depend>
<build_depend>black</build_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/anchor_pkg
[install]
install_scripts=$base/lib/anchor_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -6,7 +6,7 @@ package_name = "anchor_pkg"
setup(
name=package_name,
version="1.0.0",
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -17,8 +17,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="ASTRA VicCAN driver package, using python-can and pyserial.",
license="AGPL-3.0-only",
description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN",
license="All Rights Reserved",
entry_points={
"console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
},

View File

@@ -1,4 +1,5 @@
import sys
import threading
import signal
import math
from warnings import deprecated
@@ -12,7 +13,7 @@ from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual # TODO: Old topics
from astra_msgs.msg import ArmFeedback, ArmCtrlState, VicCAN, RevMotorState
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -25,6 +26,8 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None
class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
@@ -112,10 +115,10 @@ class ArmNode(Node):
# Control
# Manual: /arm/control/joint_jog is published by Basestation or Headless
# Manual: /arm/manual/joint_jog is published by Basestation or Headless
self.man_jointjog_sub_ = self.create_subscription(
JointJog,
"/arm/control/joint_jog",
"/arm/manual/joint_jog",
self.jointjog_callback,
qos_profile=control_qos,
)
@@ -126,13 +129,6 @@ class ArmNode(Node):
self.joint_command_callback,
qos_profile=control_qos,
)
# State: /arm/control/state is published by Basestation or Headless
self.man_state_sub_ = self.create_subscription(
ArmCtrlState,
"/arm/control/state",
self.man_state_callback,
qos_profile=control_qos,
)
# Feedback
@@ -152,14 +148,9 @@ class ArmNode(Node):
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
self.arm_feedback_new.axis0_motor.id = 1
self.arm_feedback_new.axis1_motor.id = 2
self.arm_feedback_new.axis2_motor.id = 3
self.arm_feedback_new.axis3_motor.id = 4
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.header.frame_id = "base_link"
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
@@ -182,54 +173,10 @@ class ArmNode(Node):
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=39,
data=velocities[0:4],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=39,
data=velocities[4:6],
header=msg.header,
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit",
command_id=26,
data=[velocities[6]],
header=msg.header,
)
)
self.send_velocities(velocities, msg.header)
# TODO: use msg.duration
def man_state_callback(self, msg: ArmCtrlState):
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=18,
data=[1.0 if msg.brake_mode else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm",
command_id=34,
data=[1.0 if msg.laser else 0.0],
header=Header(stamp=self.get_clock().now().to_msg()),
)
)
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
self.get_logger().debug("Ignoring malformed /joint_command message.")
@@ -255,12 +202,12 @@ class ArmNode(Node):
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:4], header=header)
VicCAN(mcu_name="arm", command_id=43, data=velocities[0:3], header=header)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:6], header=header)
VicCAN(mcu_name="digit", command_id=43, data=velocities[4:5], header=header)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
@@ -343,8 +290,6 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 53: # REV SPARK MAX feedback
motorId = round(msg.data[0])
@@ -431,14 +376,11 @@ class ArmNode(Node):
)
return
self.arm_feedback_new.header.stamp = msg.header.stamp
match msg.command_id:
case 54: # Board voltages
self.arm_feedback_new.digit_voltage.vbatt = float(msg.data[0]) / 100.0
self.arm_feedback_new.digit_voltage.v12 = float(msg.data[1]) / 100.0
self.arm_feedback_new.digit_voltage.v5 = float(msg.data[2]) / 100.0
self.arm_feedback_new.digit_voltage.header.stamp = msg.header.stamp
case 55: # Arm joint positions
self.saved_joint_state.position[4] = math.radians(
msg.data[0]

View File

@@ -3,15 +3,14 @@
<package format="3">
<name>arm_pkg</name>
<version>1.0.0</version>
<description>Relays topics related to Arm between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<description>Core arm package which handles ROS2 commnuication.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-numpy</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -12,9 +12,9 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="David Sharpe",
maintainer="David",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to Arm between VicCAN (through Anchor) and basestation.",
description="Relays topics related to the arm between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
entry_points={
"console_scripts": ["arm = arm_pkg.arm_node:main"],

View File

@@ -2,16 +2,14 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>bio_pkg</name>
<version>1.0.0</version>
<description>Biosensor package to handle command interpretation and embedded interfacing.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/bio_pkg
[install]
install_scripts=$base/lib/bio_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,8 +14,8 @@ setup(
zip_safe=True,
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Relays topics related to Biosensor between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
description="TODO: Package description",
license="TODO: License declaration",
entry_points={
"console_scripts": ["bio = bio_pkg.bio_node:main"],
},

View File

@@ -0,0 +1,112 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
import importlib
from std_msgs.msg import String
from astra_msgs.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
max_speed = 90 # Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f"Gamepad Found: {self.gamepad.get_name()}")
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, "/core/control", 10)
self.lastMsg = (
String()
) # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}"
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -1,16 +1,20 @@
import sys
import signal
from typing import Literal, cast
from scipy.spatial.transform import Rotation
from math import copysign, pi
from warnings import deprecated
from os import getenv
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from rclpy.duration import Duration
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import os
import sys
import threading
import glob
from scipy.spatial.transform import Rotation
from math import copysign, pi
from std_msgs.msg import String, Header
from sensor_msgs.msg import Imu, NavSatFix, NavSatStatus, JointState
@@ -19,14 +23,12 @@ from astra_msgs.msg import CoreControl, CoreFeedback, RevMotorState
from astra_msgs.msg import VicCAN, NewCoreFeedback, Barometer, CoreCtrlState
serial_pub = None
thread = None
CORE_WHEELBASE = 0.836 # meters
CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1
# TODO: update core_description or add testbed_description
TESTBED_WHEELBASE = 0.368 # meters
TESTBED_WHEEL_RADIUS = 0.108 # meters
TESTBED_GEAR_RATIO = 64 # Testbed: 64:1
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
@@ -39,13 +41,9 @@ control_qos = qos.QoSProfile(
# liveliness_lease_duration=Duration(seconds=5),
)
class CoreNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Core related topics."""
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_msg_len_dict = {
48: 1,
49: 1,
50: 2,
@@ -55,72 +53,24 @@ class CoreNode(Node):
54: 4,
56: 4, # really 3, but viccan
58: 4, # ditto
}
rover_platform: Literal["clucky", "testbed"]
}
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("core_node")
self.get_logger().info(f"core launch_mode is: anchor")
# Launch mode -- anchor vs core
self.declare_parameter("launch_mode", "core")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
##################################################
# Parameters
self.declare_parameter("use_ros2_control", False)
self.use_ros2_control = (
self.get_parameter("use_ros2_control").get_parameter_value().bool_value
)
self.declare_parameter("rover_platform", "auto")
rover_platform = (
self.get_parameter("rover_platform").get_parameter_value().string_value
)
if rover_platform == "auto":
self.get_logger().info(
"rover_platform parameter is unset, falling back to environment variable"
)
rover_platform = getenv("ROVER_PLATFORM", "clucky").lower()
# Verify rover_platform value is valid
if rover_platform not in ("clucky", "testbed"):
raise ValueError("rover platform must be either 'clucky' or 'testbed'.")
else:
self.rover_platform = cast(Literal["clucky", "testbed"], rover_platform)
if self.rover_platform == "testbed":
global TESTBED_WHEELBASE, TESTBED_WHEEL_RADIUS, TESTBED_GEAR_RATIO
self.wheelbase = TESTBED_WHEELBASE
self.wheel_radius = TESTBED_WHEEL_RADIUS
self.gear_ratio = TESTBED_GEAR_RATIO
else: # default in case of unset or invalid environment variable
global CORE_WHEELBASE, CORE_WHEEL_RADIUS, CORE_GEAR_RATIO
self.wheelbase = CORE_WHEELBASE
self.wheel_radius = CORE_WHEEL_RADIUS
self.gear_ratio = CORE_GEAR_RATIO
##################################################
# Old Topics
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
if not self.use_ros2_control:
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
##################################################
# New Topics
# Topics
# Anchor
if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20
)
@@ -128,16 +78,18 @@ class CoreNode(Node):
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
if self.use_ros2_control:
# Joint state control for topic-based controller
self.joint_command_sub_ = self.create_subscription(
JointState, "/core/joint_commands", self.joint_command_callback, 2
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
self.cmd_vel_sub_ = self.create_subscription(
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1
)
else:
# manual twist -- [-1, 1] rather than real units
# TODO: change topic to '/core/control/twist'
self.twist_man_sub_ = self.create_subscription(
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
)
@@ -148,74 +100,148 @@ class CoreNode(Node):
self.control_state_callback,
qos_profile=control_qos,
)
self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
self.twist_max_duty = (
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
)
# Feedback
# Consolidated and organized main core feedback
# TODO: change topic to something like '/core/feedback/main'
# Consolidated and organized core feedback
self.feedback_new_pub_ = self.create_publisher(
NewCoreFeedback,
"/core/feedback_new",
qos_profile=qos.qos_profile_sensor_data,
)
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/feedback/baro", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Timers
if self.use_ros2_control:
self.vel_cmd_timer_ = self.create_timer(0.1, self.vel_cmd_timer_callback)
###################################################
# Saved state
# Controls
self._last_joint_command_time = self.get_clock().now()
self._last_joint_command_msg = JointState()
# Main Core feedback
self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4
# IMU
self.telemetry_pub_timer = self.create_timer(
1.0, self.publish_feedback
) # TODO: not sure about this
# Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055"
# GPS
# GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer
# Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(
Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data
)
self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388"
@deprecated("Uses an old message type. Will be removed at some point.")
# Old
# /core/control
self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback()
# Debug
self.debug_pub = self.create_publisher(String, "/core/debug", 10)
self.ping_service = self.create_service(
Empty, "/astra/core/ping", self.ping_callback
)
##################################################
# Find microcontroller (Non-anchor only)
# Core (non-anchor) specific
if self.launch_mode == "core":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
self.get_logger().info(f"Enabling Relay Mode")
ser.write(b"can_relay_mode,on\n")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
# end __init__()
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
if self.launch_mode == "core":
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self): # NON-ANCHOR SPECIFIC
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
print(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
sys.exit(1)
def scale_duty(self, value: float, max_speed: float):
leftMin = -1
leftMax = 1
@@ -232,7 +258,6 @@ class CoreNode(Node):
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
@deprecated("Uses an old message type. Will be removed at some point.")
def send_controls(self, msg: CoreControl):
if msg.turn_to_enable:
command = (
@@ -258,57 +283,17 @@ class CoreNode(Node):
# print(f"[Sys] Relaying: {command}")
def joint_command_callback(self, msg: JointState):
# So... topic based control node publishes JointState messages over /joint_commands
# with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints
# are included in msg.name, but ig it is implied that msg.velocity only
# includes velocities for the commanded joints (ros__parameters.joints).
# So, this will be much more hacky and less adaptable than I would like it to be.
if (
len(msg.name) != (4 if self.rover_platform == "testbed" else 5)
or len(msg.velocity) != 4
or len(msg.position) != 0
):
self.get_logger().warning(
f"Received joint control message with unexpected number of joints. Ignoring."
)
return
if msg.name[-4:] != [ # type: ignore
"bl_wheel_joint",
"br_wheel_joint",
"fl_wheel_joint",
"fr_wheel_joint",
]:
self.get_logger().warning(
f"Received joint control message with unexpected name[]. Ignoring."
)
return
def cmd_vel_callback(self, msg: TwistStamped):
linear = msg.twist.linear.x
angular = -msg.twist.angular.z
# A 10 Hz timer callback actually sends these commands to Core for rate limiting
# These come from ros2_control at 50 Hz
self._last_joint_command_time = self.get_clock().now()
self._last_joint_command_msg = msg
vel_left_rads = (linear - (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
vel_right_rads = (linear + (angular * CORE_WHEELBASE / 2)) / CORE_WHEEL_RADIUS
def vel_cmd_timer_callback(self):
# Safety timeout for diff_drive_controller commands via topic_based_ros2_control.
# It is safe to send stop command here because if self.use_ros2_control,
# then this is the only callback that is controlling Core's motors.
if self.get_clock().now() - self._last_joint_command_time > Duration(
nanoseconds=int(1e8) # 100ms
):
self.send_viccan(20, [0.0, 0.0, 0.0, 0.0])
return
vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO
# This order is verified by the subscription callback
(bl_vel, br_vel, fl_vel, fr_vel) = self._last_joint_command_msg.velocity
# Convert wheel rad/s to motor RPM
bl_rpm = radps_to_rpm(bl_vel) * self.gear_ratio
br_rpm = radps_to_rpm(br_vel) * self.gear_ratio
fl_rpm = radps_to_rpm(fl_vel) * self.gear_ratio
fr_rpm = radps_to_rpm(fr_vel) * self.gear_ratio
self.send_viccan(20, [fl_rpm, bl_rpm, fr_rpm, br_rpm]) # REV Velocity
self.send_viccan(20, [vel_left_rpm, vel_right_rpm])
def twist_man_callback(self, msg: Twist):
linear = msg.linear.x # [-1 1] for forward/back from left stick y
@@ -349,9 +334,15 @@ class CoreNode(Node):
# Max duty cycle
self.twist_max_duty = msg.max_duty # twist_man_callback will handle this
@deprecated("Uses an old message type. Will be removed at some point.")
def send_cmd(self, msg: str):
self.anchor_pub.publish(String(data=msg)) # Publish to anchor for relay
if self.launch_mode == "anchor":
# self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String() # Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "core":
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def send_viccan(self, cmd_id: int, data: list[float]):
self.anchor_tovic_pub_.publish(
@@ -363,7 +354,6 @@ class CoreNode(Node):
)
)
@deprecated("Uses an old message type. Will be removed at some point.")
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
@@ -433,16 +423,14 @@ class CoreNode(Node):
# skill diff if not
# Check message len to prevent crashing on bad data
if msg.command_id in self.viccan_msg_len_dict:
expected_len = self.viccan_msg_len_dict[msg.command_id]
if msg.command_id in viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
self.feedback_new_state.header.stamp = msg.header.stamp
match msg.command_id:
# GNSS
case 48: # GNSS Latitude
@@ -469,7 +457,6 @@ class CoreNode(Node):
self.imu_state.linear_acceleration.x = float(msg.data[0])
self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2])
self.feedback_new_state.orientation = float(msg.data[3])
# Deal with quaternion
r = Rotation.from_euler("z", float(msg.data[3]), degrees=True)
q = r.as_quat()
@@ -514,7 +501,6 @@ class CoreNode(Node):
self.feedback_new_state.board_voltage.v12 = float(msg.data[1]) / 100.0
self.feedback_new_state.board_voltage.v5 = float(msg.data[2]) / 100.0
self.feedback_new_state.board_voltage.v3 = float(msg.data[3]) / 100.0
self.feedback_new_state.board_voltage.header.stamp = msg.header.stamp
# Baro
case 56: # BMP temperature, altitude, pressure
self.baro_state.temperature = float(msg.data[0])
@@ -527,12 +513,14 @@ class CoreNode(Node):
motorId = round(float(msg.data[0]))
position = float(msg.data[1])
velocity = float(msg.data[2])
joint_state_msg = JointState()
joint_state_msg = (
JointState()
) # TODO: not sure if all motors should be in each message or not
joint_state_msg.position = [
position * (2 * pi) / self.gear_ratio
position * (2 * pi) / CORE_GEAR_RATIO
] # revolutions to radians
joint_state_msg.velocity = [
velocity * (2 * pi / 60.0) / self.gear_ratio
velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO
] # RPM to rad/s
motor: RevMotorState | None = None
@@ -540,42 +528,52 @@ class CoreNode(Node):
match motorId:
case 1:
motor = self.feedback_new_state.fl_motor
joint_state_msg.name = ["fl_wheel_joint"]
joint_state_msg.name = ["fl_motor_joint"]
case 2:
motor = self.feedback_new_state.bl_motor
joint_state_msg.name = ["bl_wheel_joint"]
joint_state_msg.name = ["bl_motor_joint"]
case 3:
motor = self.feedback_new_state.fr_motor
joint_state_msg.name = ["fr_wheel_joint"]
joint_state_msg.name = ["fr_motor_joint"]
case 4:
motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_wheel_joint"]
joint_state_msg.name = ["br_motor_joint"]
case _:
self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
)
return
if motor:
motor.position = position
motor.velocity = velocity
# make the fucking shit work
if self.rover_platform == "clucky":
joint_state_msg.name.append("left_suspension_joint")
joint_state_msg.position.append(0.0)
joint_state_msg.velocity.append(0.0)
joint_state_msg.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(joint_state_msg)
case _:
return
@deprecated("Uses an old message type. Will be removed at some point.")
def publish_feedback(self):
# self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback)
def ping_callback(self, request, response):
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def map_range(
value: float, in_min: float, in_max: float, out_min: float, out_max: float
@@ -583,31 +581,19 @@ def map_range(
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def radps_to_rpm(radps: float):
return radps * 60 / (2 * pi)
def exit_handler(signum, frame):
print("Caught SIGTERM. Exiting...")
rclpy.try_shutdown()
sys.exit(0)
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
# Catch termination signals and exit cleanly
signal.signal(signal.SIGTERM, exit_handler)
global serial_pub
core_node = CoreNode()
try:
rclpy.spin(core_node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -3,17 +3,15 @@
<package format="3">
<name>core_pkg</name>
<version>1.0.0</version>
<description>Relays topics related to Core between VicCAN (through Anchor) and basestation.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<description>Core rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<exec_depend>python-crccheck-pip</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-scipy</depend>
<depend>python-crccheck-pip</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/core_pkg
[install]
install_scripts=$base/lib/core_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -4,7 +4,7 @@ package_name = "core_pkg"
setup(
name=package_name,
version="1.0.0",
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
@@ -12,13 +12,14 @@ setup(
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Relays topics related to Core between VicCAN (through Anchor) and basestation.",
license="AGPL-3.0-only",
maintainer="tristan",
maintainer_email="tristanmcginnis26@gmail.com",
description="Core rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
entry_points={
"console_scripts": [
"core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main",
],
},

View File

@@ -3,15 +3,14 @@
<package format="3">
<name>headless_pkg</name>
<version>1.0.0</version>
<description>Provides headless rover control, similar to Basestation.</description>
<description>Headless rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="ds0196@uah.edu">David Sharpe</maintainer>
<license>AGPL-3.0-only</license>
<depend>rclpy</depend>
<exec_depend>common_interfaces</exec_depend>
<exec_depend>python3-pygame</exec_depend>
<exec_depend>astra_msgs</exec_depend>
<depend>common_interfaces</depend>
<depend>python3-pygame</depend>
<depend>astra_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View File

@@ -2,5 +2,3 @@
script_dir=$base/lib/headless_pkg
[install]
install_scripts=$base/lib/headless_pkg
[build_scripts]
executable= /usr/bin/env python3

View File

@@ -14,9 +14,11 @@ setup(
zip_safe=True,
maintainer="David Sharpe",
maintainer_email="ds0196@uah.edu",
description="Provides headless rover control, similar to Basestation.",
license="AGPL-3.0-only",
description="Headless rover control package to handle command interpretation and embedded interfacing.",
license="All Rights Reserved",
entry_points={
"console_scripts": ["headless_full = src.headless_node:main"],
"console_scripts": [
"headless_full = src.headless_node:main",
],
},
)

View File

@@ -18,7 +18,7 @@ from std_msgs.msg import Header
from geometry_msgs.msg import Twist, TwistStamped
from control_msgs.msg import JointJog
from astra_msgs.msg import CoreControl, ArmManual, BioControl
from astra_msgs.msg import CoreCtrlState, ArmCtrlState
from astra_msgs.msg import CoreCtrlState
import warnings
@@ -138,11 +138,6 @@ class Headless(Node):
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
self.declare_parameter("use_cmd_vel", False)
self.use_cmd_vel = (
self.get_parameter("use_cmd_vel").get_parameter_value().bool_value
)
self.declare_parameter("use_bio", False)
self.use_bio = self.get_parameter("use_bio").get_parameter_value().bool_value
@@ -150,6 +145,7 @@ class Headless(Node):
self.use_arm_ik = (
self.get_parameter("use_arm_ik").get_parameter_value().bool_value
)
# NOTE: only applicable if use_old_topics == True
self.declare_parameter("use_new_arm_manual_scheme", True)
self.use_new_arm_manual_scheme = (
@@ -159,13 +155,6 @@ class Headless(Node):
)
# Check parameter validity
if self.use_cmd_vel:
self.get_logger().info("Using cmd_vel for core control")
global CORE_MODE
CORE_MODE = "twist"
else:
self.get_logger().info("Using astra_msgs/CoreControl for core control")
if self.use_arm_ik and self.use_old_topics:
self.get_logger().fatal("Old topics do not support arm IK control.")
sys.exit(1)
@@ -177,8 +166,6 @@ class Headless(Node):
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
self.arm_brake_mode = False
self.arm_laser = False
##################################################
# Old Topics
@@ -197,18 +184,12 @@ class Headless(Node):
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_cmd_vel_pub_ = self.create_publisher(
TwistStamped, "/diff_controller/cmd_vel", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/control/joint_jog", qos_profile=control_qos
)
self.arm_state_pub_ = self.create_publisher(
ArmCtrlState, "/arm/control/state", qos_profile=control_qos
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
@@ -250,17 +231,11 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150)
# Added so you can tell when it starts running after changing the constant logging to debug from info
self.get_logger().info("Defaulting to Core mode. Ready.")
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
if self.use_cmd_vel:
self.core_cmd_vel_pub_.publish(self.core_cmd_vel_stop_msg())
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
if self.use_arm_ik:
@@ -357,32 +332,18 @@ class Headless(Node):
self.core_publisher.publish(input)
else: # New topics
twist = Twist()
input = Twist()
# Forward/back and Turn
twist.linear.x = -1.0 * left_stick_y
twist.angular.z = -1.0 * copysign(
input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x
) # Exponent for finer control (curve)
# This kinda looks dumb being seperate from the following block, but this
# maintains the separation between modifying the control message and sending it.
if self.use_cmd_vel:
# These scaling factors convert raw stick inputs to absolute m/s and rad/s
# values that DiffDriveController will convert to motor RPM, rather than
# the plain Twist, which just sends the stick values as duty cycle and
# sends that scaled to the motors.
twist.linear.x *= 1.0
twist.angular.z *= 1.5
# Publish
if self.use_cmd_vel:
header = Header(stamp=self.get_clock().now().to_msg())
self.core_cmd_vel_pub_.publish(TwistStamped(header=header, twist=twist))
else:
self.core_twist_pub_.publish(twist)
self.get_logger().debug(
f"[Core Ctrl] Linear: {round(twist.linear.x, 2)}, Angular: {round(twist.angular.z, 2)}"
self.core_twist_pub_.publish(input)
self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
)
# Brake mode
@@ -591,26 +552,13 @@ class Headless(Node):
)
# A: brake
new_brake_mode = button_a
# TODO: Brake mode
# X: laser
new_laser = button_x
# Y: linear actuator
# TODO: linear actuator
self.arm_manual_pub_.publish(arm_input)
# Only publish state if needed
if new_brake_mode != self.arm_brake_mode or new_laser != self.arm_laser:
self.arm_brake_mode = new_brake_mode
self.arm_laser = new_laser
state_msg = ArmCtrlState()
state_msg.brake_mode = bool(self.arm_brake_mode)
state_msg.laser = bool(self.arm_laser)
self.arm_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Arm State] Brake: {self.arm_brake_mode}, Laser: {self.arm_laser}"
)
# IK (ONLY NEW)
# =============
@@ -695,11 +643,6 @@ class Headless(Node):
else:
pass # TODO: implement new bio control topics
def core_cmd_vel_stop_msg(self):
return TwistStamped(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg())
)
def arm_manual_stop_msg(self):
return JointJog(
header=Header(frame_id="base_link", stamp=self.get_clock().now().to_msg()),