13 Commits

Author SHA1 Message Date
David Sharpe
62fd1b110d refactor: remedy QoS profiles 2026-03-08 03:57:28 -05:00
David Sharpe
aaf40124fa Merge branch 'main' into arm-topic-refactor 2026-03-08 03:56:25 -05:00
David Sharpe
294ae393de feat: (headless) add new arm manual (JointJog) 2026-03-08 03:29:20 -05:00
David Sharpe
9c9d3d675e refactor: (headless) reorganize send_controls() into different functions 2026-03-08 01:08:17 -06:00
David Sharpe
6fa47021fc refactor: (headless) reorganize __init__(), add use_old_topics parameter 2026-03-08 00:42:41 -06:00
David Sharpe
f23d8c62ff feat: (arm) add use_old_topics parameter 2026-03-08 00:19:06 -06:00
David Sharpe
667247cac8 refactor: (arm) reorganize __init__() and remove run() 2026-03-08 00:10:33 -06:00
David Sharpe
0929cc9503 feat: (arm) add JointJog for manual arm input 2026-03-07 23:55:33 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
4 changed files with 403 additions and 201 deletions

View File

@@ -25,7 +25,7 @@
name = "ASTRA Anchor"; name = "ASTRA Anchor";
packages = with pkgs; [ packages = with pkgs; [
colcon colcon
(python312.withPackages ( (python313.withPackages (
p: with p; [ p: with p; [
pyserial pyserial
pygame pygame

View File

@@ -1,92 +1,39 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
import serial
import sys import sys
import threading import threading
import glob
import time
import atexit
import signal import signal
from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math import math
# control_qos = qos.QoSProfile( import rclpy
# history=qos.QoSHistoryPolicy.KEEP_LAST, from rclpy.node import Node
# depth=1, from rclpy.executors import ExternalShutdownException
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, from rclpy import qos
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000, from std_msgs.msg import String, Header
# lifespan=500, from sensor_msgs.msg import JointState
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, from control_msgs.msg import JointJog
# liveliness_lease_duration=5000 from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
# ) from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
class ArmNode(Node): class ArmNode(Node):
def __init__(self): """Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output # Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
################################################## all_joint_names = [
# Topics
# Anchor topics
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# Manual: who tf knows. Maybe JointJog?
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"axis_0_joint", "axis_0_joint",
"axis_1_joint", "axis_1_joint",
"axis_2_joint", "axis_2_joint",
@@ -95,14 +42,43 @@ class ArmNode(Node):
"wrist_roll_joint", "wrist_roll_joint",
"ef_gripper_left_joint", "ef_gripper_left_joint",
] ]
self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
# Old # Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
53: 4,
54: 4,
55: 4,
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
def __init__(self):
super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
##################################################
# Parameters
self.declare_parameter("use_old_topics", True)
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value
##################################################
# Old topics
if self.use_old_topics:
# Anchor topics
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Create publishers # Create publishers
self.socket_pub = self.create_publisher( self.socket_pub = self.create_publisher(
@@ -118,15 +94,105 @@ class ArmNode(Node):
ArmManual, "/arm/control/manual", self.send_manual, 10 ArmManual, "/arm/control/manual", self.send_manual, 10
) )
def run(self): ###################################################
global thread # New topics
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
try: # Anchor topics
thread.join()
except KeyboardInterrupt: # from_vic
pass self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Control
# Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
)
# Feedback
# Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
# IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog):
if (
len(msg.joint_names) == 0
or len(msg.velocities) == 0
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)]
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
)
)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: if len(msg.position) < 7 and len(msg.velocity) < 7:
@@ -153,6 +219,7 @@ class ArmNode(Node):
self.anchor_tovic_pub_.publish(digit_cmd) self.anchor_tovic_pub_.publish(digit_cmd)
def send_manual(self, msg: ArmManual): def send_manual(self, msg: ArmManual):
"""TODO: Old"""
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
@@ -166,7 +233,7 @@ class ArmNode(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n" command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n" command += f"can_relay_tovic,digit,28,{msg.laser}\n"
@@ -177,10 +244,12 @@ class ArmNode(Node):
return return
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
"""TODO: Old"""
output = String(data=msg) output = String(data=msg)
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
"""TODO: Old"""
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
self.updateAngleFeedback(output) self.updateAngleFeedback(output)
@@ -217,8 +286,8 @@ class ArmNode(Node):
return return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict: if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id] expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -308,8 +377,8 @@ class ArmNode(Node):
return return
# Check message len to prevent crashing on bad data # Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict: if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id] expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -330,10 +399,12 @@ class ArmNode(Node):
) # Wrist yaw ) # Wrist yaw
def publish_feedback(self): def publish_feedback(self):
"""TODO: Old"""
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: str):
"""TODO: Old"""
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas # split the msg.data by commas
parts = msg.split(",") parts = msg.split(",")
@@ -352,6 +423,7 @@ class ArmNode(Node):
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str): def updateBusVoltage(self, msg: str):
"""TODO: Old"""
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.split(",")
if len(parts) >= 7: if len(parts) >= 7:
@@ -366,6 +438,7 @@ class ArmNode(Node):
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg: str): def updateMotorFeedback(self, msg: str):
"""TODO: Old"""
parts = str(msg.strip()).split(",") parts = str(msg.strip()).split(",")
motorId = round(float(parts[3])) motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0 temp = float(parts[4]) / 10.0
@@ -393,7 +466,14 @@ def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
arm_node = ArmNode() arm_node = ArmNode()
arm_node.run() thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
thread.start()
try:
thread.join()
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown() rclpy.try_shutdown()

View File

@@ -33,12 +33,12 @@ CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile( control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST, history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE, durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message

View File

@@ -41,31 +41,38 @@ control_qos = qos.QoSProfile(
depth=2, depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE, durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1), # deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms # lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, # liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5), # liveliness_lease_duration=Duration(seconds=5),
) )
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
)
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05")) STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
class Headless(Node): class Headless(Node):
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
def __init__(self): def __init__(self):
# Initialize pygame first # Initialize pygame first
pygame.init() pygame.init()
pygame.joystick.init() pygame.joystick.init()
super().__init__("headless") super().__init__("headless")
##################################################
# Preamble
# Wait for anchor to start # Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug") pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
while len(pub_info) == 0: while len(pub_info) == 0:
@@ -116,30 +123,23 @@ class Headless(Node):
break break
id += 1 id += 1
self.create_timer(0.1, self.send_controls) ##################################################
# Parameters
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2) self.declare_parameter("use_old_topics", True)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2) self.use_old_topics = (
self.arm_ik_twist_publisher = self.create_publisher( self.get_parameter("use_old_topics").get_parameter_value().bool_value
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
) )
self.declare_parameter("arm_mode", "manual") self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value self.arm_mode = (
self.get_parameter("arm_mode").get_parameter_value().string_value
)
self.declare_parameter("arm_manual_scheme", "old") self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value self.arm_manual_scheme = (
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
)
# Check parameter validity # Check parameter validity
if self.arm_mode not in ["manual", "ik"]: if self.arm_mode not in ["manual", "ik"]:
@@ -151,6 +151,49 @@ class Headless(Node):
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)." f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
) )
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
)
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands # If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None self.servo_start_client = None
if self.arm_mode == "ik": if self.arm_mode == "ik":
@@ -170,13 +213,17 @@ class Headless(Node):
if self.servo_start_client.service_is_ready(): if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request()) self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
def send_controls(self): def send_controls(self):
"""Read the gamepad state and publish control messages""" """Read the gamepad state and publish control messages"""
for event in pygame.event.get(): for event in pygame.event.get():
@@ -187,10 +234,8 @@ class Headless(Node):
# Check if controller is still connected # Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads: if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...") print("Gamepad disconnected. Exiting...")
# Send one last zero control message # Stop the rover if controller disconnected
self.core_publisher.publish(CORE_STOP_MSG) self.stop_all()
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info("Final stop commands sent. Shutting down.") self.get_logger().info("Final stop commands sent. Shutting down.")
# Clean up # Clean up
pygame.quit() pygame.quit()
@@ -206,22 +251,41 @@ class Headless(Node):
new_ctrl_mode = "core" new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode: if new_ctrl_mode != self.ctrl_mode:
self.core_publisher.publish(CORE_STOP_MSG) self.stop_all()
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.gamepad.rumble(0.6, 0.7, 75) self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
# CORE # Actually send the controls
if self.ctrl_mode == "core" and CORE_MODE == "duty": if self.ctrl_mode == "core":
input = CoreControl() self.send_core()
input.max_speed = 90 if self.use_old_topics:
self.arm_publisher.publish(ARM_STOP_MSG)
else:
self.send_arm()
# self.send_bio()
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
def send_core(self):
# Collect controller state # Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1)) left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4)) right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5)) right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
input = CoreControl()
input.max_speed = 90
# Right wheels # Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2)) input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -238,16 +302,9 @@ class Headless(Node):
self.core_publisher.publish(input) self.core_publisher.publish(input)
elif self.ctrl_mode == "core" and CORE_MODE == "twist": else: # New topics
input = Twist() input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
button_a = self.gamepad.get_button(0)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
# Forward/back and Turn # Forward/back and Turn
input.linear.x = -1.0 * left_stick_y input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign( input.angular.z = -1.0 * copysign(
@@ -280,15 +337,13 @@ class Headless(Node):
state_msg = CoreCtrlState() state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode) state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty) state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg) self.core_state_pub_.publish(state_msg)
self.get_logger().info( self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}" f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
) )
# ARM and BIO def send_arm(self):
if self.ctrl_mode == "arm" and self.arm_mode == "manual":
arm_input = ArmManual()
# Collect controller state # Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0)) left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1)) left_stick_y = deadzone(self.gamepad.get_axis(1))
@@ -304,6 +359,11 @@ class Headless(Node):
right_bumper = self.gamepad.get_button(5) right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0) dpad_input = self.gamepad.get_hat(0)
# OLD MANUAL
if self.arm_mode == "manual" and self.use_old_topics:
arm_input = ArmManual()
# OLD ARM CONTROL SCHEME
if self.arm_manual_scheme == "old": if self.arm_manual_scheme == "old":
# EF Grippers # EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
@@ -347,6 +407,7 @@ class Headless(Node):
if abs(right_stick_y) > 0.15: if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
# NEW ARM CONTROL SCHEME
if self.arm_manual_scheme == "new": if self.arm_manual_scheme == "new":
# Right stick: EF yaw and axis 3 # Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2 # Left stick: axis 1 and 2
@@ -409,22 +470,70 @@ class Headless(Node):
else: else:
arm_input.linear_actuator = 0 arm_input.linear_actuator = 0
# BIO
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.arm_publisher.publish(arm_input) self.arm_publisher.publish(arm_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik": # NEW MANUAL
elif self.arm_mode == "manual" and not self.use_old_topics:
arm_input = JointJog()
arm_input.header.frame_id = "base_link"
arm_input.header.stamp = self.get_clock().now().to_msg()
arm_input.joint_names = self.all_joint_names
arm_input.velocities = [0.0] * len(self.all_joint_names)
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = (
float(copysign(1, right_stick_x)) if abs(right_stick_x) >= ARM_THRESHOLD else 0.0
)
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = (
float(-1 * copysign(1, right_stick_y)) if abs(right_stick_y) >= ARM_THRESHOLD else 0.0
)
# Left stick: axis 1 and 2
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = (
float(copysign(1, left_stick_x)) if abs(left_stick_x) >= ARM_THRESHOLD else 0.0
)
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = (
float(-1 * copysign(1, left_stick_y)) if abs(left_stick_y) >= ARM_THRESHOLD else 0.0
)
# D-pad: axis 0 and _
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = (
float(copysign(1, dpad_input[0])) if dpad_input[0] != 0 else 0.0
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 0.0
elif left_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = -1.0
elif right_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 1.0
# Bumpers: EF roll
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = right_bumper - left_bumper
# A: brake
# TODO: Brake mode
# Y: linear actuator
# TODO: linear actuator
self.arm_manual_pub_.publish(arm_input)
# IK
elif self.arm_mode == "ik":
arm_twist = TwistStamped() arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link" arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg() arm_twist.header.stamp = self.get_clock().now().to_msg()
@@ -432,21 +541,6 @@ class Headless(Node):
arm_jointjog.header.frame_id = "base_link" arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg() arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x # Right stick: linear y and linear x
# Left stick: angular z and linear z # Left stick: angular z and linear z
# D-pad: angular y and _ # D-pad: angular y and _
@@ -473,9 +567,7 @@ class Headless(Node):
# Triggers: EF Grippers # Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0: if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append( arm_jointjog.joint_names.append("ef_gripper_left_joint")
"ef_gripper_left_joint"
)
arm_jointjog.velocities.append(float(right_trigger - left_trigger)) arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x # Bumpers: angular x
@@ -489,6 +581,36 @@ class Headless(Node):
self.arm_ik_twist_publisher.publish(arm_twist) self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out # self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def send_bio(self):
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.bio_publisher.publish(bio_input)
def deadzone(value: float, threshold=STICK_DEADZONE) -> float: def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry""" """Apply a deadzone to a joystick input so the motors don't sound angry"""