13 Commits

Author SHA1 Message Date
David Sharpe
62fd1b110d refactor: remedy QoS profiles 2026-03-08 03:57:28 -05:00
David Sharpe
aaf40124fa Merge branch 'main' into arm-topic-refactor 2026-03-08 03:56:25 -05:00
David Sharpe
294ae393de feat: (headless) add new arm manual (JointJog) 2026-03-08 03:29:20 -05:00
David Sharpe
9c9d3d675e refactor: (headless) reorganize send_controls() into different functions 2026-03-08 01:08:17 -06:00
David Sharpe
6fa47021fc refactor: (headless) reorganize __init__(), add use_old_topics parameter 2026-03-08 00:42:41 -06:00
David Sharpe
f23d8c62ff feat: (arm) add use_old_topics parameter 2026-03-08 00:19:06 -06:00
David Sharpe
667247cac8 refactor: (arm) reorganize __init__() and remove run() 2026-03-08 00:10:33 -06:00
David Sharpe
0929cc9503 feat: (arm) add JointJog for manual arm input 2026-03-07 23:55:33 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
4 changed files with 403 additions and 201 deletions

View File

@@ -25,7 +25,7 @@
name = "ASTRA Anchor";
packages = with pkgs; [
colcon
(python312.withPackages (
(python313.withPackages (
p: with p; [
pyserial
pygame

View File

@@ -1,32 +1,48 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
import serial
import sys
import threading
import glob
import time
import atexit
import signal
from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
import math
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from rclpy import qos
from std_msgs.msg import String, Header
from sensor_msgs.msg import JointState
from control_msgs.msg import JointJog
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
# liveliness_lease_duration=Duration(seconds=5),
)
thread = None
class ArmNode(Node):
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
# Used to verify the length of an incoming VicCAN feedback message
# Key is VicCAN command_id, value is expected length of data list
viccan_socket_msg_len_dict = {
@@ -36,74 +52,34 @@ viccan_socket_msg_len_dict = {
58: 4,
59: 4,
}
viccan_digit_msg_len_dict = {
54: 4,
55: 2,
59: 2,
}
class ArmNode(Node):
def __init__(self):
super().__init__("arm_node")
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
##################################################
# Topics
# Parameters
self.declare_parameter("use_old_topics", True)
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value
##################################################
# Old topics
if self.use_old_topics:
# Anchor topics
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(
String, "/anchor/arm/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control
# Manual: who tf knows. Maybe JointJog?
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 1
)
# Feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
self.arm_feedback_new = ArmFeedback()
# IK: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
self.saved_joint_state = JointState()
self.saved_joint_state.name = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
self.saved_joint_state.position = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
self.saved_joint_state.velocity = [0.0] * len(
self.saved_joint_state.name
) # Initialize with zeros
# Old
# Create publishers
self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10
@@ -118,15 +94,105 @@ class ArmNode(Node):
ArmManual, "/arm/control/manual", self.send_manual, 10
)
def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
###################################################
# New topics
try:
thread.join()
except KeyboardInterrupt:
pass
# Anchor topics
# from_vic
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
)
# to_vic
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Control
# Manual: /arm/manual_new is published by Servo or Basestation
self.jointjog_pub_ = self.create_subscription(
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
)
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
self.joint_command_sub_ = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
)
# Feedback
# Combined Socket and Digit feedback
self.arm_feedback_pub_ = self.create_publisher(
ArmFeedback,
"/arm/feedback/new_feedback",
qos_profile=qos.qos_profile_sensor_data,
)
# IK arm pose: /joint_states is published from here to topic_based_control
self.joint_state_pub_ = self.create_publisher(
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
)
###################################################
# Saved state
# Combined Socket and Digit feedback
self.arm_feedback_new = ArmFeedback()
# IK Arm pose
self.saved_joint_state = JointState()
self.saved_joint_state.name = self.all_joint_names
# ... initialize with zeros
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
def jointjog_callback(self, msg: JointJog):
if (
len(msg.joint_names) == 0
or len(msg.velocities) == 0
or len(msg.joint_names) != len(msg.velocities)
):
return # Malformed message
# Grab velocities from message
velocities = [
(
msg.velocities[msg.joint_names.index(joint_name)]
if joint_name in msg.joint_names
else 0.0
)
for joint_name in self.all_joint_names
]
# Deadzone
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
velocities = [
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
]
# Axis 2 & 3 URDF direction is inverted
velocities[2] = -velocities[2]
velocities[3] = -velocities[3]
# Send Axis 0-3
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
)
)
# Send Wrist yaw and roll
# TODO: Verify embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
)
)
# Send End Effector Gripper
# TODO: Verify m/s received correctly by embedded
self.anchor_tovic_pub_.publish(
VicCAN(
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
)
)
# TODO: use msg.duration
def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7:
@@ -153,6 +219,7 @@ class ArmNode(Node):
self.anchor_tovic_pub_.publish(digit_cmd)
def send_manual(self, msg: ArmManual):
"""TODO: Old"""
axis0 = msg.axis0
axis1 = -1 * msg.axis1
axis2 = msg.axis2
@@ -166,7 +233,7 @@ class ArmNode(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n"
@@ -177,10 +244,12 @@ class ArmNode(Node):
return
def send_cmd(self, msg: str):
"""TODO: Old"""
output = String(data=msg)
self.anchor_pub.publish(output)
def anchor_feedback(self, msg: String):
"""TODO: Old"""
output = msg.data
if output.startswith("can_relay_fromvic,arm,55"):
self.updateAngleFeedback(output)
@@ -217,8 +286,8 @@ class ArmNode(Node):
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_socket_msg_len_dict:
expected_len = viccan_socket_msg_len_dict[msg.command_id]
if msg.command_id in self.viccan_socket_msg_len_dict:
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -308,8 +377,8 @@ class ArmNode(Node):
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_digit_msg_len_dict:
expected_len = viccan_digit_msg_len_dict[msg.command_id]
if msg.command_id in self.viccan_digit_msg_len_dict:
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
@@ -330,10 +399,12 @@ class ArmNode(Node):
) # Wrist yaw
def publish_feedback(self):
"""TODO: Old"""
self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str):
"""TODO: Old"""
# Angle feedbacks,
# split the msg.data by commas
parts = msg.split(",")
@@ -352,6 +423,7 @@ class ArmNode(Node):
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str):
"""TODO: Old"""
# Bus Voltage feedbacks
parts = msg.split(",")
if len(parts) >= 7:
@@ -366,6 +438,7 @@ class ArmNode(Node):
self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg: str):
"""TODO: Old"""
parts = str(msg.strip()).split(",")
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
@@ -393,7 +466,14 @@ def main(args=None):
rclpy.init(args=args)
arm_node = ArmNode()
arm_node.run()
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
thread.start()
try:
thread.join()
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.try_shutdown()

View File

@@ -33,12 +33,12 @@ CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
# Used to verify the length of an incoming VicCAN feedback message

View File

@@ -41,31 +41,38 @@ control_qos = qos.QoSProfile(
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
arm_ik_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
)
CORE_MODE = "twist" # "twist" or "duty"
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
class Headless(Node):
# Every non-fixed joint defined in Arm's URDF
# Used for JointState and JointJog messsages
all_joint_names = [
"axis_0_joint",
"axis_1_joint",
"axis_2_joint",
"axis_3_joint",
"wrist_yaw_joint",
"wrist_roll_joint",
"ef_gripper_left_joint",
]
def __init__(self):
# Initialize pygame first
pygame.init()
pygame.joystick.init()
super().__init__("headless")
##################################################
# Preamble
# Wait for anchor to start
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
while len(pub_info) == 0:
@@ -116,30 +123,23 @@ class Headless(Node):
break
id += 1
self.create_timer(0.1, self.send_controls)
##################################################
# Parameters
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
self.declare_parameter("use_old_topics", True)
self.use_old_topics = (
self.get_parameter("use_old_topics").get_parameter_value().bool_value
)
self.declare_parameter("arm_mode", "manual")
self.arm_mode = self.get_parameter("arm_mode").value
self.arm_mode = (
self.get_parameter("arm_mode").get_parameter_value().string_value
)
self.declare_parameter("arm_manual_scheme", "old")
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
self.arm_manual_scheme = (
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
)
# Check parameter validity
if self.arm_mode not in ["manual", "ik"]:
@@ -151,6 +151,49 @@ class Headless(Node):
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
)
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
##################################################
# Old Topics
if self.use_old_topics:
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(
ArmManual, "/arm/control/manual", 2
)
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
##################################################
# New Topics
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.arm_manual_pub_ = self.create_publisher(
JointJog, "/arm/manual_new", qos_profile=control_qos
)
self.arm_ik_twist_publisher = self.create_publisher(
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
)
self.arm_ik_jointjog_publisher = self.create_publisher(
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
)
##################################################
# Timers
self.create_timer(0.1, self.send_controls)
##################################################
# Services
# If using IK control, we have to "start" the servo node to enable it to accept commands
self.servo_start_client = None
if self.arm_mode == "ik":
@@ -170,13 +213,17 @@ class Headless(Node):
if self.servo_start_client.service_is_ready():
self.servo_start_client.call_async(Trigger.Request())
self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
# Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150)
def stop_all(self):
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
else:
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
def send_controls(self):
"""Read the gamepad state and publish control messages"""
for event in pygame.event.get():
@@ -187,10 +234,8 @@ class Headless(Node):
# Check if controller is still connected
if pygame.joystick.get_count() != self.num_gamepads:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
# Stop the rover if controller disconnected
self.stop_all()
self.get_logger().info("Final stop commands sent. Shutting down.")
# Clean up
pygame.quit()
@@ -206,22 +251,41 @@ class Headless(Node):
new_ctrl_mode = "core"
if new_ctrl_mode != self.ctrl_mode:
self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(ARM_STOP_MSG)
self.bio_publisher.publish(BIO_STOP_MSG)
self.stop_all()
self.gamepad.rumble(0.6, 0.7, 75)
self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
# CORE
if self.ctrl_mode == "core" and CORE_MODE == "duty":
input = CoreControl()
input.max_speed = 90
# Actually send the controls
if self.ctrl_mode == "core":
self.send_core()
if self.use_old_topics:
self.arm_publisher.publish(ARM_STOP_MSG)
else:
self.send_arm()
# self.send_bio()
if self.use_old_topics:
self.core_publisher.publish(CORE_STOP_MSG)
def send_core(self):
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
input = CoreControl()
input.max_speed = 90
# Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -238,16 +302,9 @@ class Headless(Node):
self.core_publisher.publish(input)
elif self.ctrl_mode == "core" and CORE_MODE == "twist":
else: # New topics
input = Twist()
# Collect controller state
left_stick_y = deadzone(self.gamepad.get_axis(1))
right_stick_x = deadzone(self.gamepad.get_axis(3))
button_a = self.gamepad.get_button(0)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
# Forward/back and Turn
input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(
@@ -280,15 +337,13 @@ class Headless(Node):
state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg)
self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
)
# ARM and BIO
if self.ctrl_mode == "arm" and self.arm_mode == "manual":
arm_input = ArmManual()
def send_arm(self):
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
@@ -304,6 +359,11 @@ class Headless(Node):
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# OLD MANUAL
if self.arm_mode == "manual" and self.use_old_topics:
arm_input = ArmManual()
# OLD ARM CONTROL SCHEME
if self.arm_manual_scheme == "old":
# EF Grippers
if left_trigger > 0 and right_trigger > 0:
@@ -347,6 +407,7 @@ class Headless(Node):
if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y)
# NEW ARM CONTROL SCHEME
if self.arm_manual_scheme == "new":
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
@@ -409,22 +470,70 @@ class Headless(Node):
else:
arm_input.linear_actuator = 0
# BIO
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.arm_publisher.publish(arm_input)
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
# NEW MANUAL
elif self.arm_mode == "manual" and not self.use_old_topics:
arm_input = JointJog()
arm_input.header.frame_id = "base_link"
arm_input.header.stamp = self.get_clock().now().to_msg()
arm_input.joint_names = self.all_joint_names
arm_input.velocities = [0.0] * len(self.all_joint_names)
# Right stick: EF yaw and axis 3
# Left stick: axis 1 and 2
# D-pad: axis 0 and _
# Triggers: EF grippers
# Bumpers: EF roll
# A: brake
# B: linear actuator in
# X: _
# Y: linear actuator out
ARM_THRESHOLD = 0.2
# Right stick: EF yaw and axis 3
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = (
float(copysign(1, right_stick_x)) if abs(right_stick_x) >= ARM_THRESHOLD else 0.0
)
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = (
float(-1 * copysign(1, right_stick_y)) if abs(right_stick_y) >= ARM_THRESHOLD else 0.0
)
# Left stick: axis 1 and 2
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = (
float(copysign(1, left_stick_x)) if abs(left_stick_x) >= ARM_THRESHOLD else 0.0
)
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = (
float(-1 * copysign(1, left_stick_y)) if abs(left_stick_y) >= ARM_THRESHOLD else 0.0
)
# D-pad: axis 0 and _
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = (
float(copysign(1, dpad_input[0])) if dpad_input[0] != 0 else 0.0
)
# Triggers: EF Grippers
if left_trigger > 0 and right_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 0.0
elif left_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = -1.0
elif right_trigger > 0:
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 1.0
# Bumpers: EF roll
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = right_bumper - left_bumper
# A: brake
# TODO: Brake mode
# Y: linear actuator
# TODO: linear actuator
self.arm_manual_pub_.publish(arm_input)
# IK
elif self.arm_mode == "ik":
arm_twist = TwistStamped()
arm_twist.header.frame_id = "base_link"
arm_twist.header.stamp = self.get_clock().now().to_msg()
@@ -432,21 +541,6 @@ class Headless(Node):
arm_jointjog.header.frame_id = "base_link"
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
# Right stick: linear y and linear x
# Left stick: angular z and linear z
# D-pad: angular y and _
@@ -473,9 +567,7 @@ class Headless(Node):
# Triggers: EF Grippers
if left_trigger > 0 or right_trigger > 0:
arm_jointjog.joint_names.append(
"ef_gripper_left_joint"
)
arm_jointjog.joint_names.append("ef_gripper_left_joint")
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
# Bumpers: angular x
@@ -489,6 +581,36 @@ class Headless(Node):
self.arm_ik_twist_publisher.publish(arm_twist)
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
def send_bio(self):
# Collect controller state
left_stick_x = deadzone(self.gamepad.get_axis(0))
left_stick_y = deadzone(self.gamepad.get_axis(1))
left_trigger = deadzone(self.gamepad.get_axis(2))
right_stick_x = deadzone(self.gamepad.get_axis(3))
right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5))
button_a = self.gamepad.get_button(0)
button_b = self.gamepad.get_button(1)
button_x = self.gamepad.get_button(2)
button_y = self.gamepad.get_button(3)
left_bumper = self.gamepad.get_button(4)
right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0)
if self.use_old_topics:
bio_input = BioControl(
bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100),
)
# Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.bio_publisher.publish(bio_input)
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
"""Apply a deadzone to a joystick input so the motors don't sound angry"""