mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 20:01:15 -05:00
Compare commits
13 Commits
169ab85607
...
62fd1b110d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62fd1b110d | ||
|
|
aaf40124fa | ||
|
|
294ae393de | ||
|
|
9c9d3d675e | ||
|
|
6fa47021fc | ||
|
|
f23d8c62ff | ||
|
|
667247cac8 | ||
|
|
0929cc9503 | ||
|
|
3f68052144 | ||
|
|
c72f72dc32 | ||
|
|
61f5f1fc3e | ||
|
|
d9355f16e9 | ||
|
|
9fc120b09e |
@@ -25,7 +25,7 @@
|
|||||||
name = "ASTRA Anchor";
|
name = "ASTRA Anchor";
|
||||||
packages = with pkgs; [
|
packages = with pkgs; [
|
||||||
colcon
|
colcon
|
||||||
(python312.withPackages (
|
(python313.withPackages (
|
||||||
p: with p; [
|
p: with p; [
|
||||||
pyserial
|
pyserial
|
||||||
pygame
|
pygame
|
||||||
|
|||||||
@@ -1,132 +1,198 @@
|
|||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy import qos
|
|
||||||
import serial
|
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import glob
|
|
||||||
import time
|
|
||||||
import atexit
|
|
||||||
import signal
|
import signal
|
||||||
from std_msgs.msg import String, Header
|
|
||||||
from sensor_msgs.msg import JointState
|
|
||||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
|
||||||
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
|
||||||
import math
|
import math
|
||||||
|
|
||||||
# control_qos = qos.QoSProfile(
|
import rclpy
|
||||||
# history=qos.QoSHistoryPolicy.KEEP_LAST,
|
from rclpy.node import Node
|
||||||
# depth=1,
|
from rclpy.executors import ExternalShutdownException
|
||||||
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
from rclpy import qos
|
||||||
# durability=qos.QoSDurabilityPolicy.VOLATILE,
|
|
||||||
# deadline=1000,
|
from std_msgs.msg import String, Header
|
||||||
# lifespan=500,
|
from sensor_msgs.msg import JointState
|
||||||
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
from control_msgs.msg import JointJog
|
||||||
# liveliness_lease_duration=5000
|
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
||||||
# )
|
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
||||||
|
|
||||||
|
control_qos = qos.QoSProfile(
|
||||||
|
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=2,
|
||||||
|
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
|
||||||
|
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||||
|
# deadline=Duration(seconds=1),
|
||||||
|
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||||
|
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||||
|
# liveliness_lease_duration=Duration(seconds=5),
|
||||||
|
)
|
||||||
|
|
||||||
thread = None
|
thread = None
|
||||||
|
|
||||||
# Used to verify the length of an incoming VicCAN feedback message
|
|
||||||
# Key is VicCAN command_id, value is expected length of data list
|
|
||||||
viccan_socket_msg_len_dict = {
|
|
||||||
53: 4,
|
|
||||||
54: 4,
|
|
||||||
55: 4,
|
|
||||||
58: 4,
|
|
||||||
59: 4,
|
|
||||||
}
|
|
||||||
viccan_digit_msg_len_dict = {
|
|
||||||
54: 4,
|
|
||||||
55: 2,
|
|
||||||
59: 2,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ArmNode(Node):
|
class ArmNode(Node):
|
||||||
|
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
|
||||||
|
|
||||||
|
# Every non-fixed joint defined in Arm's URDF
|
||||||
|
# Used for JointState and JointJog messsages
|
||||||
|
all_joint_names = [
|
||||||
|
"axis_0_joint",
|
||||||
|
"axis_1_joint",
|
||||||
|
"axis_2_joint",
|
||||||
|
"axis_3_joint",
|
||||||
|
"wrist_yaw_joint",
|
||||||
|
"wrist_roll_joint",
|
||||||
|
"ef_gripper_left_joint",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Used to verify the length of an incoming VicCAN feedback message
|
||||||
|
# Key is VicCAN command_id, value is expected length of data list
|
||||||
|
viccan_socket_msg_len_dict = {
|
||||||
|
53: 4,
|
||||||
|
54: 4,
|
||||||
|
55: 4,
|
||||||
|
58: 4,
|
||||||
|
59: 4,
|
||||||
|
}
|
||||||
|
|
||||||
|
viccan_digit_msg_len_dict = {
|
||||||
|
54: 4,
|
||||||
|
55: 2,
|
||||||
|
59: 2,
|
||||||
|
}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("arm_node")
|
super().__init__("arm_node")
|
||||||
|
|
||||||
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
|
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Topics
|
# Parameters
|
||||||
|
|
||||||
|
self.declare_parameter("use_old_topics", True)
|
||||||
|
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Old topics
|
||||||
|
|
||||||
|
if self.use_old_topics:
|
||||||
|
# Anchor topics
|
||||||
|
self.anchor_sub = self.create_subscription(
|
||||||
|
String, "/anchor/arm/feedback", self.anchor_feedback, 10
|
||||||
|
)
|
||||||
|
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
|
||||||
|
|
||||||
|
# Create publishers
|
||||||
|
self.socket_pub = self.create_publisher(
|
||||||
|
SocketFeedback, "/arm/feedback/socket", 10
|
||||||
|
)
|
||||||
|
self.arm_feedback = SocketFeedback()
|
||||||
|
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
||||||
|
self.digit_feedback = DigitFeedback()
|
||||||
|
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||||
|
|
||||||
|
# Create subscribers
|
||||||
|
self.man_sub = self.create_subscription(
|
||||||
|
ArmManual, "/arm/control/manual", self.send_manual, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
###################################################
|
||||||
|
# New topics
|
||||||
|
|
||||||
# Anchor topics
|
# Anchor topics
|
||||||
|
|
||||||
|
# from_vic
|
||||||
self.anchor_fromvic_sub_ = self.create_subscription(
|
self.anchor_fromvic_sub_ = self.create_subscription(
|
||||||
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
|
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
|
||||||
)
|
)
|
||||||
|
# to_vic
|
||||||
self.anchor_tovic_pub_ = self.create_publisher(
|
self.anchor_tovic_pub_ = self.create_publisher(
|
||||||
VicCAN, "/anchor/to_vic/relay", 20
|
VicCAN, "/anchor/to_vic/relay", 20
|
||||||
)
|
)
|
||||||
|
|
||||||
self.anchor_sub = self.create_subscription(
|
|
||||||
String, "/anchor/arm/feedback", self.anchor_feedback, 10
|
|
||||||
)
|
|
||||||
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
|
|
||||||
|
|
||||||
# Control
|
# Control
|
||||||
|
|
||||||
# Manual: who tf knows. Maybe JointJog?
|
# Manual: /arm/manual_new is published by Servo or Basestation
|
||||||
|
self.jointjog_pub_ = self.create_subscription(
|
||||||
|
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
|
||||||
|
)
|
||||||
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
||||||
self.joint_command_sub_ = self.create_subscription(
|
self.joint_command_sub_ = self.create_subscription(
|
||||||
JointState, "/joint_commands", self.joint_command_callback, 1
|
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
|
||||||
)
|
)
|
||||||
|
|
||||||
# Feedback
|
# Feedback
|
||||||
|
|
||||||
|
# Combined Socket and Digit feedback
|
||||||
self.arm_feedback_pub_ = self.create_publisher(
|
self.arm_feedback_pub_ = self.create_publisher(
|
||||||
ArmFeedback,
|
ArmFeedback,
|
||||||
"/arm/feedback/new_feedback",
|
"/arm/feedback/new_feedback",
|
||||||
qos_profile=qos.qos_profile_sensor_data,
|
qos_profile=qos.qos_profile_sensor_data,
|
||||||
)
|
)
|
||||||
self.arm_feedback_new = ArmFeedback()
|
# IK arm pose: /joint_states is published from here to topic_based_control
|
||||||
# IK: /joint_states is published from here to topic_based_control
|
|
||||||
self.joint_state_pub_ = self.create_publisher(
|
self.joint_state_pub_ = self.create_publisher(
|
||||||
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
|
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
|
||||||
)
|
)
|
||||||
|
|
||||||
|
###################################################
|
||||||
|
# Saved state
|
||||||
|
|
||||||
|
# Combined Socket and Digit feedback
|
||||||
|
self.arm_feedback_new = ArmFeedback()
|
||||||
|
|
||||||
|
# IK Arm pose
|
||||||
self.saved_joint_state = JointState()
|
self.saved_joint_state = JointState()
|
||||||
self.saved_joint_state.name = [
|
self.saved_joint_state.name = self.all_joint_names
|
||||||
"axis_0_joint",
|
# ... initialize with zeros
|
||||||
"axis_1_joint",
|
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
|
||||||
"axis_2_joint",
|
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
|
||||||
"axis_3_joint",
|
|
||||||
"wrist_yaw_joint",
|
def jointjog_callback(self, msg: JointJog):
|
||||||
"wrist_roll_joint",
|
if (
|
||||||
"ef_gripper_left_joint",
|
len(msg.joint_names) == 0
|
||||||
|
or len(msg.velocities) == 0
|
||||||
|
or len(msg.joint_names) != len(msg.velocities)
|
||||||
|
):
|
||||||
|
return # Malformed message
|
||||||
|
|
||||||
|
# Grab velocities from message
|
||||||
|
velocities = [
|
||||||
|
(
|
||||||
|
msg.velocities[msg.joint_names.index(joint_name)]
|
||||||
|
if joint_name in msg.joint_names
|
||||||
|
else 0.0
|
||||||
|
)
|
||||||
|
for joint_name in self.all_joint_names
|
||||||
]
|
]
|
||||||
self.saved_joint_state.position = [0.0] * len(
|
# Deadzone
|
||||||
self.saved_joint_state.name
|
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
|
||||||
) # Initialize with zeros
|
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
|
||||||
self.saved_joint_state.velocity = [0.0] * len(
|
velocities = [
|
||||||
self.saved_joint_state.name
|
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
|
||||||
) # Initialize with zeros
|
]
|
||||||
|
# Axis 2 & 3 URDF direction is inverted
|
||||||
|
velocities[2] = -velocities[2]
|
||||||
|
velocities[3] = -velocities[3]
|
||||||
|
|
||||||
# Old
|
# Send Axis 0-3
|
||||||
|
self.anchor_tovic_pub_.publish(
|
||||||
# Create publishers
|
VicCAN(
|
||||||
self.socket_pub = self.create_publisher(
|
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
|
||||||
SocketFeedback, "/arm/feedback/socket", 10
|
)
|
||||||
)
|
)
|
||||||
self.arm_feedback = SocketFeedback()
|
# Send Wrist yaw and roll
|
||||||
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
# TODO: Verify embedded
|
||||||
self.digit_feedback = DigitFeedback()
|
self.anchor_tovic_pub_.publish(
|
||||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
VicCAN(
|
||||||
|
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
|
||||||
# Create subscribers
|
)
|
||||||
self.man_sub = self.create_subscription(
|
|
||||||
ArmManual, "/arm/control/manual", self.send_manual, 10
|
|
||||||
)
|
)
|
||||||
|
# Send End Effector Gripper
|
||||||
def run(self):
|
# TODO: Verify m/s received correctly by embedded
|
||||||
global thread
|
self.anchor_tovic_pub_.publish(
|
||||||
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
|
VicCAN(
|
||||||
thread.start()
|
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
|
||||||
|
)
|
||||||
try:
|
)
|
||||||
thread.join()
|
# TODO: use msg.duration
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def joint_command_callback(self, msg: JointState):
|
def joint_command_callback(self, msg: JointState):
|
||||||
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
||||||
@@ -153,6 +219,7 @@ class ArmNode(Node):
|
|||||||
self.anchor_tovic_pub_.publish(digit_cmd)
|
self.anchor_tovic_pub_.publish(digit_cmd)
|
||||||
|
|
||||||
def send_manual(self, msg: ArmManual):
|
def send_manual(self, msg: ArmManual):
|
||||||
|
"""TODO: Old"""
|
||||||
axis0 = msg.axis0
|
axis0 = msg.axis0
|
||||||
axis1 = -1 * msg.axis1
|
axis1 = -1 * msg.axis1
|
||||||
axis2 = msg.axis2
|
axis2 = msg.axis2
|
||||||
@@ -166,7 +233,7 @@ class ArmNode(Node):
|
|||||||
|
|
||||||
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
|
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
|
||||||
|
|
||||||
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
|
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
|
||||||
|
|
||||||
command += f"can_relay_tovic,digit,28,{msg.laser}\n"
|
command += f"can_relay_tovic,digit,28,{msg.laser}\n"
|
||||||
|
|
||||||
@@ -177,10 +244,12 @@ class ArmNode(Node):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def send_cmd(self, msg: str):
|
def send_cmd(self, msg: str):
|
||||||
|
"""TODO: Old"""
|
||||||
output = String(data=msg)
|
output = String(data=msg)
|
||||||
self.anchor_pub.publish(output)
|
self.anchor_pub.publish(output)
|
||||||
|
|
||||||
def anchor_feedback(self, msg: String):
|
def anchor_feedback(self, msg: String):
|
||||||
|
"""TODO: Old"""
|
||||||
output = msg.data
|
output = msg.data
|
||||||
if output.startswith("can_relay_fromvic,arm,55"):
|
if output.startswith("can_relay_fromvic,arm,55"):
|
||||||
self.updateAngleFeedback(output)
|
self.updateAngleFeedback(output)
|
||||||
@@ -217,8 +286,8 @@ class ArmNode(Node):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Check message len to prevent crashing on bad data
|
# Check message len to prevent crashing on bad data
|
||||||
if msg.command_id in viccan_socket_msg_len_dict:
|
if msg.command_id in self.viccan_socket_msg_len_dict:
|
||||||
expected_len = viccan_socket_msg_len_dict[msg.command_id]
|
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
|
||||||
if len(msg.data) != expected_len:
|
if len(msg.data) != expected_len:
|
||||||
self.get_logger().warning(
|
self.get_logger().warning(
|
||||||
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
||||||
@@ -308,8 +377,8 @@ class ArmNode(Node):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Check message len to prevent crashing on bad data
|
# Check message len to prevent crashing on bad data
|
||||||
if msg.command_id in viccan_digit_msg_len_dict:
|
if msg.command_id in self.viccan_digit_msg_len_dict:
|
||||||
expected_len = viccan_digit_msg_len_dict[msg.command_id]
|
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
|
||||||
if len(msg.data) != expected_len:
|
if len(msg.data) != expected_len:
|
||||||
self.get_logger().warning(
|
self.get_logger().warning(
|
||||||
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
||||||
@@ -330,10 +399,12 @@ class ArmNode(Node):
|
|||||||
) # Wrist yaw
|
) # Wrist yaw
|
||||||
|
|
||||||
def publish_feedback(self):
|
def publish_feedback(self):
|
||||||
|
"""TODO: Old"""
|
||||||
self.socket_pub.publish(self.arm_feedback)
|
self.socket_pub.publish(self.arm_feedback)
|
||||||
self.digit_pub.publish(self.digit_feedback)
|
self.digit_pub.publish(self.digit_feedback)
|
||||||
|
|
||||||
def updateAngleFeedback(self, msg: str):
|
def updateAngleFeedback(self, msg: str):
|
||||||
|
"""TODO: Old"""
|
||||||
# Angle feedbacks,
|
# Angle feedbacks,
|
||||||
# split the msg.data by commas
|
# split the msg.data by commas
|
||||||
parts = msg.split(",")
|
parts = msg.split(",")
|
||||||
@@ -352,6 +423,7 @@ class ArmNode(Node):
|
|||||||
self.get_logger().info("Invalid angle feedback input format")
|
self.get_logger().info("Invalid angle feedback input format")
|
||||||
|
|
||||||
def updateBusVoltage(self, msg: str):
|
def updateBusVoltage(self, msg: str):
|
||||||
|
"""TODO: Old"""
|
||||||
# Bus Voltage feedbacks
|
# Bus Voltage feedbacks
|
||||||
parts = msg.split(",")
|
parts = msg.split(",")
|
||||||
if len(parts) >= 7:
|
if len(parts) >= 7:
|
||||||
@@ -366,6 +438,7 @@ class ArmNode(Node):
|
|||||||
self.get_logger().info("Invalid voltage feedback input format")
|
self.get_logger().info("Invalid voltage feedback input format")
|
||||||
|
|
||||||
def updateMotorFeedback(self, msg: str):
|
def updateMotorFeedback(self, msg: str):
|
||||||
|
"""TODO: Old"""
|
||||||
parts = str(msg.strip()).split(",")
|
parts = str(msg.strip()).split(",")
|
||||||
motorId = round(float(parts[3]))
|
motorId = round(float(parts[3]))
|
||||||
temp = float(parts[4]) / 10.0
|
temp = float(parts[4]) / 10.0
|
||||||
@@ -393,8 +466,15 @@ def main(args=None):
|
|||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
|
|
||||||
arm_node = ArmNode()
|
arm_node = ArmNode()
|
||||||
arm_node.run()
|
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
|
||||||
rclpy.try_shutdown()
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
thread.join()
|
||||||
|
except (KeyboardInterrupt, ExternalShutdownException):
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -33,12 +33,12 @@ CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
|
|||||||
control_qos = qos.QoSProfile(
|
control_qos = qos.QoSProfile(
|
||||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||||
depth=2,
|
depth=2,
|
||||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
|
||||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||||
deadline=Duration(seconds=1),
|
# deadline=Duration(seconds=1),
|
||||||
lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||||
liveliness_lease_duration=Duration(seconds=5),
|
# liveliness_lease_duration=Duration(seconds=5),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Used to verify the length of an incoming VicCAN feedback message
|
# Used to verify the length of an incoming VicCAN feedback message
|
||||||
|
|||||||
@@ -41,31 +41,38 @@ control_qos = qos.QoSProfile(
|
|||||||
depth=2,
|
depth=2,
|
||||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||||
deadline=Duration(seconds=1),
|
# deadline=Duration(seconds=1),
|
||||||
lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||||
liveliness_lease_duration=Duration(seconds=5),
|
# liveliness_lease_duration=Duration(seconds=5),
|
||||||
)
|
)
|
||||||
|
|
||||||
arm_ik_qos = qos.QoSProfile(
|
|
||||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
|
||||||
depth=1,
|
|
||||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
|
||||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
|
||||||
)
|
|
||||||
|
|
||||||
CORE_MODE = "twist" # "twist" or "duty"
|
|
||||||
|
|
||||||
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
|
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
|
||||||
|
|
||||||
|
|
||||||
class Headless(Node):
|
class Headless(Node):
|
||||||
|
# Every non-fixed joint defined in Arm's URDF
|
||||||
|
# Used for JointState and JointJog messsages
|
||||||
|
all_joint_names = [
|
||||||
|
"axis_0_joint",
|
||||||
|
"axis_1_joint",
|
||||||
|
"axis_2_joint",
|
||||||
|
"axis_3_joint",
|
||||||
|
"wrist_yaw_joint",
|
||||||
|
"wrist_roll_joint",
|
||||||
|
"ef_gripper_left_joint",
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Initialize pygame first
|
# Initialize pygame first
|
||||||
pygame.init()
|
pygame.init()
|
||||||
pygame.joystick.init()
|
pygame.joystick.init()
|
||||||
super().__init__("headless")
|
super().__init__("headless")
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Preamble
|
||||||
|
|
||||||
# Wait for anchor to start
|
# Wait for anchor to start
|
||||||
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
|
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
|
||||||
while len(pub_info) == 0:
|
while len(pub_info) == 0:
|
||||||
@@ -116,30 +123,23 @@ class Headless(Node):
|
|||||||
break
|
break
|
||||||
id += 1
|
id += 1
|
||||||
|
|
||||||
self.create_timer(0.1, self.send_controls)
|
##################################################
|
||||||
|
# Parameters
|
||||||
|
|
||||||
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
self.declare_parameter("use_old_topics", True)
|
||||||
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
|
self.use_old_topics = (
|
||||||
self.arm_ik_twist_publisher = self.create_publisher(
|
self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||||
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
|
|
||||||
)
|
|
||||||
self.arm_ik_jointjog_publisher = self.create_publisher(
|
|
||||||
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
|
|
||||||
)
|
|
||||||
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
|
|
||||||
|
|
||||||
self.core_twist_pub_ = self.create_publisher(
|
|
||||||
Twist, "/core/twist", qos_profile=control_qos
|
|
||||||
)
|
|
||||||
self.core_state_pub_ = self.create_publisher(
|
|
||||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self.declare_parameter("arm_mode", "manual")
|
self.declare_parameter("arm_mode", "manual")
|
||||||
self.arm_mode = self.get_parameter("arm_mode").value
|
self.arm_mode = (
|
||||||
|
self.get_parameter("arm_mode").get_parameter_value().string_value
|
||||||
|
)
|
||||||
|
|
||||||
self.declare_parameter("arm_manual_scheme", "old")
|
self.declare_parameter("arm_manual_scheme", "old")
|
||||||
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
|
self.arm_manual_scheme = (
|
||||||
|
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
|
||||||
|
)
|
||||||
|
|
||||||
# Check parameter validity
|
# Check parameter validity
|
||||||
if self.arm_mode not in ["manual", "ik"]:
|
if self.arm_mode not in ["manual", "ik"]:
|
||||||
@@ -151,6 +151,49 @@ class Headless(Node):
|
|||||||
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
|
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.ctrl_mode = "core" # Start in core mode
|
||||||
|
self.core_brake_mode = False
|
||||||
|
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Old Topics
|
||||||
|
|
||||||
|
if self.use_old_topics:
|
||||||
|
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
||||||
|
self.arm_publisher = self.create_publisher(
|
||||||
|
ArmManual, "/arm/control/manual", 2
|
||||||
|
)
|
||||||
|
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# New Topics
|
||||||
|
|
||||||
|
self.core_twist_pub_ = self.create_publisher(
|
||||||
|
Twist, "/core/twist", qos_profile=control_qos
|
||||||
|
)
|
||||||
|
self.core_state_pub_ = self.create_publisher(
|
||||||
|
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
self.arm_manual_pub_ = self.create_publisher(
|
||||||
|
JointJog, "/arm/manual_new", qos_profile=control_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
self.arm_ik_twist_publisher = self.create_publisher(
|
||||||
|
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
|
||||||
|
)
|
||||||
|
self.arm_ik_jointjog_publisher = self.create_publisher(
|
||||||
|
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Timers
|
||||||
|
|
||||||
|
self.create_timer(0.1, self.send_controls)
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Services
|
||||||
|
|
||||||
# If using IK control, we have to "start" the servo node to enable it to accept commands
|
# If using IK control, we have to "start" the servo node to enable it to accept commands
|
||||||
self.servo_start_client = None
|
self.servo_start_client = None
|
||||||
if self.arm_mode == "ik":
|
if self.arm_mode == "ik":
|
||||||
@@ -170,13 +213,17 @@ class Headless(Node):
|
|||||||
if self.servo_start_client.service_is_ready():
|
if self.servo_start_client.service_is_ready():
|
||||||
self.servo_start_client.call_async(Trigger.Request())
|
self.servo_start_client.call_async(Trigger.Request())
|
||||||
|
|
||||||
self.ctrl_mode = "core" # Start in core mode
|
|
||||||
self.core_brake_mode = False
|
|
||||||
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
|
|
||||||
|
|
||||||
# Rumble when node is ready (returns False if rumble not supported)
|
# Rumble when node is ready (returns False if rumble not supported)
|
||||||
self.gamepad.rumble(0.7, 0.8, 150)
|
self.gamepad.rumble(0.7, 0.8, 150)
|
||||||
|
|
||||||
|
def stop_all(self):
|
||||||
|
if self.use_old_topics:
|
||||||
|
self.core_publisher.publish(CORE_STOP_MSG)
|
||||||
|
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||||
|
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||||
|
else:
|
||||||
|
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
||||||
|
|
||||||
def send_controls(self):
|
def send_controls(self):
|
||||||
"""Read the gamepad state and publish control messages"""
|
"""Read the gamepad state and publish control messages"""
|
||||||
for event in pygame.event.get():
|
for event in pygame.event.get():
|
||||||
@@ -187,10 +234,8 @@ class Headless(Node):
|
|||||||
# Check if controller is still connected
|
# Check if controller is still connected
|
||||||
if pygame.joystick.get_count() != self.num_gamepads:
|
if pygame.joystick.get_count() != self.num_gamepads:
|
||||||
print("Gamepad disconnected. Exiting...")
|
print("Gamepad disconnected. Exiting...")
|
||||||
# Send one last zero control message
|
# Stop the rover if controller disconnected
|
||||||
self.core_publisher.publish(CORE_STOP_MSG)
|
self.stop_all()
|
||||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
|
||||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
|
||||||
self.get_logger().info("Final stop commands sent. Shutting down.")
|
self.get_logger().info("Final stop commands sent. Shutting down.")
|
||||||
# Clean up
|
# Clean up
|
||||||
pygame.quit()
|
pygame.quit()
|
||||||
@@ -206,23 +251,42 @@ class Headless(Node):
|
|||||||
new_ctrl_mode = "core"
|
new_ctrl_mode = "core"
|
||||||
|
|
||||||
if new_ctrl_mode != self.ctrl_mode:
|
if new_ctrl_mode != self.ctrl_mode:
|
||||||
self.core_publisher.publish(CORE_STOP_MSG)
|
self.stop_all()
|
||||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
|
||||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
|
||||||
self.gamepad.rumble(0.6, 0.7, 75)
|
self.gamepad.rumble(0.6, 0.7, 75)
|
||||||
self.ctrl_mode = new_ctrl_mode
|
self.ctrl_mode = new_ctrl_mode
|
||||||
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
|
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
|
||||||
|
|
||||||
# CORE
|
# Actually send the controls
|
||||||
if self.ctrl_mode == "core" and CORE_MODE == "duty":
|
if self.ctrl_mode == "core":
|
||||||
|
self.send_core()
|
||||||
|
if self.use_old_topics:
|
||||||
|
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||||
|
else:
|
||||||
|
self.send_arm()
|
||||||
|
# self.send_bio()
|
||||||
|
if self.use_old_topics:
|
||||||
|
self.core_publisher.publish(CORE_STOP_MSG)
|
||||||
|
|
||||||
|
def send_core(self):
|
||||||
|
# Collect controller state
|
||||||
|
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||||
|
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||||
|
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||||
|
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||||
|
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||||
|
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||||
|
button_a = self.gamepad.get_button(0)
|
||||||
|
button_b = self.gamepad.get_button(1)
|
||||||
|
button_x = self.gamepad.get_button(2)
|
||||||
|
button_y = self.gamepad.get_button(3)
|
||||||
|
left_bumper = self.gamepad.get_button(4)
|
||||||
|
right_bumper = self.gamepad.get_button(5)
|
||||||
|
dpad_input = self.gamepad.get_hat(0)
|
||||||
|
|
||||||
|
if self.use_old_topics:
|
||||||
input = CoreControl()
|
input = CoreControl()
|
||||||
input.max_speed = 90
|
input.max_speed = 90
|
||||||
|
|
||||||
# Collect controller state
|
|
||||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
|
||||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
|
||||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
|
||||||
|
|
||||||
# Right wheels
|
# Right wheels
|
||||||
input.right_stick = float(round(-1 * right_stick_y, 2))
|
input.right_stick = float(round(-1 * right_stick_y, 2))
|
||||||
|
|
||||||
@@ -238,16 +302,9 @@ class Headless(Node):
|
|||||||
|
|
||||||
self.core_publisher.publish(input)
|
self.core_publisher.publish(input)
|
||||||
|
|
||||||
elif self.ctrl_mode == "core" and CORE_MODE == "twist":
|
else: # New topics
|
||||||
input = Twist()
|
input = Twist()
|
||||||
|
|
||||||
# Collect controller state
|
|
||||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
|
||||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
|
||||||
button_a = self.gamepad.get_button(0)
|
|
||||||
left_bumper = self.gamepad.get_button(4)
|
|
||||||
right_bumper = self.gamepad.get_button(5)
|
|
||||||
|
|
||||||
# Forward/back and Turn
|
# Forward/back and Turn
|
||||||
input.linear.x = -1.0 * left_stick_y
|
input.linear.x = -1.0 * left_stick_y
|
||||||
input.angular.z = -1.0 * copysign(
|
input.angular.z = -1.0 * copysign(
|
||||||
@@ -280,30 +337,33 @@ class Headless(Node):
|
|||||||
state_msg = CoreCtrlState()
|
state_msg = CoreCtrlState()
|
||||||
state_msg.brake_mode = bool(self.core_brake_mode)
|
state_msg.brake_mode = bool(self.core_brake_mode)
|
||||||
state_msg.max_duty = float(self.core_max_duty)
|
state_msg.max_duty = float(self.core_max_duty)
|
||||||
|
|
||||||
self.core_state_pub_.publish(state_msg)
|
self.core_state_pub_.publish(state_msg)
|
||||||
self.get_logger().info(
|
self.get_logger().info(
|
||||||
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
|
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# ARM and BIO
|
def send_arm(self):
|
||||||
if self.ctrl_mode == "arm" and self.arm_mode == "manual":
|
# Collect controller state
|
||||||
|
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||||
|
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||||
|
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||||
|
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||||
|
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||||
|
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||||
|
button_a = self.gamepad.get_button(0)
|
||||||
|
button_b = self.gamepad.get_button(1)
|
||||||
|
button_x = self.gamepad.get_button(2)
|
||||||
|
button_y = self.gamepad.get_button(3)
|
||||||
|
left_bumper = self.gamepad.get_button(4)
|
||||||
|
right_bumper = self.gamepad.get_button(5)
|
||||||
|
dpad_input = self.gamepad.get_hat(0)
|
||||||
|
|
||||||
|
# OLD MANUAL
|
||||||
|
if self.arm_mode == "manual" and self.use_old_topics:
|
||||||
arm_input = ArmManual()
|
arm_input = ArmManual()
|
||||||
|
|
||||||
# Collect controller state
|
# OLD ARM CONTROL SCHEME
|
||||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
|
||||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
|
||||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
|
||||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
|
||||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
|
||||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
|
||||||
button_a = self.gamepad.get_button(0)
|
|
||||||
button_b = self.gamepad.get_button(1)
|
|
||||||
button_x = self.gamepad.get_button(2)
|
|
||||||
button_y = self.gamepad.get_button(3)
|
|
||||||
left_bumper = self.gamepad.get_button(4)
|
|
||||||
right_bumper = self.gamepad.get_button(5)
|
|
||||||
dpad_input = self.gamepad.get_hat(0)
|
|
||||||
|
|
||||||
if self.arm_manual_scheme == "old":
|
if self.arm_manual_scheme == "old":
|
||||||
# EF Grippers
|
# EF Grippers
|
||||||
if left_trigger > 0 and right_trigger > 0:
|
if left_trigger > 0 and right_trigger > 0:
|
||||||
@@ -347,6 +407,7 @@ class Headless(Node):
|
|||||||
if abs(right_stick_y) > 0.15:
|
if abs(right_stick_y) > 0.15:
|
||||||
arm_input.axis3 = -1 * round(right_stick_y)
|
arm_input.axis3 = -1 * round(right_stick_y)
|
||||||
|
|
||||||
|
# NEW ARM CONTROL SCHEME
|
||||||
if self.arm_manual_scheme == "new":
|
if self.arm_manual_scheme == "new":
|
||||||
# Right stick: EF yaw and axis 3
|
# Right stick: EF yaw and axis 3
|
||||||
# Left stick: axis 1 and 2
|
# Left stick: axis 1 and 2
|
||||||
@@ -409,22 +470,70 @@ class Headless(Node):
|
|||||||
else:
|
else:
|
||||||
arm_input.linear_actuator = 0
|
arm_input.linear_actuator = 0
|
||||||
|
|
||||||
|
|
||||||
# BIO
|
|
||||||
bio_input = BioControl(
|
|
||||||
bio_arm=int(left_stick_y * -100),
|
|
||||||
drill_arm=int(round(right_stick_y) * -100),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Drill motor (FAERIE)
|
|
||||||
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
|
|
||||||
bio_input.drill = int(
|
|
||||||
30 * (right_trigger - left_trigger)
|
|
||||||
) # Max duty cycle 30%
|
|
||||||
|
|
||||||
self.arm_publisher.publish(arm_input)
|
self.arm_publisher.publish(arm_input)
|
||||||
|
|
||||||
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
|
# NEW MANUAL
|
||||||
|
elif self.arm_mode == "manual" and not self.use_old_topics:
|
||||||
|
arm_input = JointJog()
|
||||||
|
arm_input.header.frame_id = "base_link"
|
||||||
|
arm_input.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
arm_input.joint_names = self.all_joint_names
|
||||||
|
arm_input.velocities = [0.0] * len(self.all_joint_names)
|
||||||
|
|
||||||
|
# Right stick: EF yaw and axis 3
|
||||||
|
# Left stick: axis 1 and 2
|
||||||
|
# D-pad: axis 0 and _
|
||||||
|
# Triggers: EF grippers
|
||||||
|
# Bumpers: EF roll
|
||||||
|
# A: brake
|
||||||
|
# B: linear actuator in
|
||||||
|
# X: _
|
||||||
|
# Y: linear actuator out
|
||||||
|
|
||||||
|
ARM_THRESHOLD = 0.2
|
||||||
|
|
||||||
|
# Right stick: EF yaw and axis 3
|
||||||
|
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = (
|
||||||
|
float(copysign(1, right_stick_x)) if abs(right_stick_x) >= ARM_THRESHOLD else 0.0
|
||||||
|
)
|
||||||
|
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = (
|
||||||
|
float(-1 * copysign(1, right_stick_y)) if abs(right_stick_y) >= ARM_THRESHOLD else 0.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Left stick: axis 1 and 2
|
||||||
|
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = (
|
||||||
|
float(copysign(1, left_stick_x)) if abs(left_stick_x) >= ARM_THRESHOLD else 0.0
|
||||||
|
)
|
||||||
|
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = (
|
||||||
|
float(-1 * copysign(1, left_stick_y)) if abs(left_stick_y) >= ARM_THRESHOLD else 0.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# D-pad: axis 0 and _
|
||||||
|
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = (
|
||||||
|
float(copysign(1, dpad_input[0])) if dpad_input[0] != 0 else 0.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Triggers: EF Grippers
|
||||||
|
if left_trigger > 0 and right_trigger > 0:
|
||||||
|
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 0.0
|
||||||
|
elif left_trigger > 0:
|
||||||
|
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = -1.0
|
||||||
|
elif right_trigger > 0:
|
||||||
|
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 1.0
|
||||||
|
|
||||||
|
# Bumpers: EF roll
|
||||||
|
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = right_bumper - left_bumper
|
||||||
|
|
||||||
|
# A: brake
|
||||||
|
# TODO: Brake mode
|
||||||
|
|
||||||
|
# Y: linear actuator
|
||||||
|
# TODO: linear actuator
|
||||||
|
|
||||||
|
self.arm_manual_pub_.publish(arm_input)
|
||||||
|
|
||||||
|
# IK
|
||||||
|
elif self.arm_mode == "ik":
|
||||||
arm_twist = TwistStamped()
|
arm_twist = TwistStamped()
|
||||||
arm_twist.header.frame_id = "base_link"
|
arm_twist.header.frame_id = "base_link"
|
||||||
arm_twist.header.stamp = self.get_clock().now().to_msg()
|
arm_twist.header.stamp = self.get_clock().now().to_msg()
|
||||||
@@ -432,21 +541,6 @@ class Headless(Node):
|
|||||||
arm_jointjog.header.frame_id = "base_link"
|
arm_jointjog.header.frame_id = "base_link"
|
||||||
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
|
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
# Collect controller state
|
|
||||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
|
||||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
|
||||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
|
||||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
|
||||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
|
||||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
|
||||||
button_a = self.gamepad.get_button(0)
|
|
||||||
button_b = self.gamepad.get_button(1)
|
|
||||||
button_x = self.gamepad.get_button(2)
|
|
||||||
button_y = self.gamepad.get_button(3)
|
|
||||||
left_bumper = self.gamepad.get_button(4)
|
|
||||||
right_bumper = self.gamepad.get_button(5)
|
|
||||||
dpad_input = self.gamepad.get_hat(0)
|
|
||||||
|
|
||||||
# Right stick: linear y and linear x
|
# Right stick: linear y and linear x
|
||||||
# Left stick: angular z and linear z
|
# Left stick: angular z and linear z
|
||||||
# D-pad: angular y and _
|
# D-pad: angular y and _
|
||||||
@@ -473,9 +567,7 @@ class Headless(Node):
|
|||||||
|
|
||||||
# Triggers: EF Grippers
|
# Triggers: EF Grippers
|
||||||
if left_trigger > 0 or right_trigger > 0:
|
if left_trigger > 0 or right_trigger > 0:
|
||||||
arm_jointjog.joint_names.append(
|
arm_jointjog.joint_names.append("ef_gripper_left_joint")
|
||||||
"ef_gripper_left_joint"
|
|
||||||
)
|
|
||||||
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
|
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
|
||||||
|
|
||||||
# Bumpers: angular x
|
# Bumpers: angular x
|
||||||
@@ -489,6 +581,36 @@ class Headless(Node):
|
|||||||
self.arm_ik_twist_publisher.publish(arm_twist)
|
self.arm_ik_twist_publisher.publish(arm_twist)
|
||||||
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
|
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
|
||||||
|
|
||||||
|
def send_bio(self):
|
||||||
|
# Collect controller state
|
||||||
|
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||||
|
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||||
|
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||||
|
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||||
|
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||||
|
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||||
|
button_a = self.gamepad.get_button(0)
|
||||||
|
button_b = self.gamepad.get_button(1)
|
||||||
|
button_x = self.gamepad.get_button(2)
|
||||||
|
button_y = self.gamepad.get_button(3)
|
||||||
|
left_bumper = self.gamepad.get_button(4)
|
||||||
|
right_bumper = self.gamepad.get_button(5)
|
||||||
|
dpad_input = self.gamepad.get_hat(0)
|
||||||
|
|
||||||
|
if self.use_old_topics:
|
||||||
|
bio_input = BioControl(
|
||||||
|
bio_arm=int(left_stick_y * -100),
|
||||||
|
drill_arm=int(round(right_stick_y) * -100),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Drill motor (FAERIE)
|
||||||
|
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
|
||||||
|
bio_input.drill = int(
|
||||||
|
30 * (right_trigger - left_trigger)
|
||||||
|
) # Max duty cycle 30%
|
||||||
|
|
||||||
|
self.bio_publisher.publish(bio_input)
|
||||||
|
|
||||||
|
|
||||||
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||||
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
||||||
|
|||||||
Reference in New Issue
Block a user