mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-04-20 11:51:16 -05:00
Compare commits
13 Commits
169ab85607
...
62fd1b110d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62fd1b110d | ||
|
|
aaf40124fa | ||
|
|
294ae393de | ||
|
|
9c9d3d675e | ||
|
|
6fa47021fc | ||
|
|
f23d8c62ff | ||
|
|
667247cac8 | ||
|
|
0929cc9503 | ||
|
|
3f68052144 | ||
|
|
c72f72dc32 | ||
|
|
61f5f1fc3e | ||
|
|
d9355f16e9 | ||
|
|
9fc120b09e |
@@ -25,7 +25,7 @@
|
||||
name = "ASTRA Anchor";
|
||||
packages = with pkgs; [
|
||||
colcon
|
||||
(python312.withPackages (
|
||||
(python313.withPackages (
|
||||
p: with p; [
|
||||
pyserial
|
||||
pygame
|
||||
|
||||
@@ -1,132 +1,198 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy import qos
|
||||
import serial
|
||||
import sys
|
||||
import threading
|
||||
import glob
|
||||
import time
|
||||
import atexit
|
||||
import signal
|
||||
from std_msgs.msg import String, Header
|
||||
from sensor_msgs.msg import JointState
|
||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
||||
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
||||
import math
|
||||
|
||||
# control_qos = qos.QoSProfile(
|
||||
# history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
# depth=1,
|
||||
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||
# durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||
# deadline=1000,
|
||||
# lifespan=500,
|
||||
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
# liveliness_lease_duration=5000
|
||||
# )
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.executors import ExternalShutdownException
|
||||
from rclpy import qos
|
||||
|
||||
from std_msgs.msg import String, Header
|
||||
from sensor_msgs.msg import JointState
|
||||
from control_msgs.msg import JointJog
|
||||
from astra_msgs.msg import SocketFeedback, DigitFeedback, ArmManual
|
||||
from astra_msgs.msg import ArmFeedback, VicCAN, RevMotorState
|
||||
|
||||
control_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
|
||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||
# deadline=Duration(seconds=1),
|
||||
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
# liveliness_lease_duration=Duration(seconds=5),
|
||||
)
|
||||
|
||||
thread = None
|
||||
|
||||
# Used to verify the length of an incoming VicCAN feedback message
|
||||
# Key is VicCAN command_id, value is expected length of data list
|
||||
viccan_socket_msg_len_dict = {
|
||||
53: 4,
|
||||
54: 4,
|
||||
55: 4,
|
||||
58: 4,
|
||||
59: 4,
|
||||
}
|
||||
viccan_digit_msg_len_dict = {
|
||||
54: 4,
|
||||
55: 2,
|
||||
59: 2,
|
||||
}
|
||||
|
||||
|
||||
class ArmNode(Node):
|
||||
"""Relay between Anchor and Basestation/Headless/Moveit2 for Arm related topics."""
|
||||
|
||||
# Every non-fixed joint defined in Arm's URDF
|
||||
# Used for JointState and JointJog messsages
|
||||
all_joint_names = [
|
||||
"axis_0_joint",
|
||||
"axis_1_joint",
|
||||
"axis_2_joint",
|
||||
"axis_3_joint",
|
||||
"wrist_yaw_joint",
|
||||
"wrist_roll_joint",
|
||||
"ef_gripper_left_joint",
|
||||
]
|
||||
|
||||
# Used to verify the length of an incoming VicCAN feedback message
|
||||
# Key is VicCAN command_id, value is expected length of data list
|
||||
viccan_socket_msg_len_dict = {
|
||||
53: 4,
|
||||
54: 4,
|
||||
55: 4,
|
||||
58: 4,
|
||||
59: 4,
|
||||
}
|
||||
|
||||
viccan_digit_msg_len_dict = {
|
||||
54: 4,
|
||||
55: 2,
|
||||
59: 2,
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("arm_node")
|
||||
|
||||
self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
|
||||
|
||||
##################################################
|
||||
# Topics
|
||||
# Parameters
|
||||
|
||||
self.declare_parameter("use_old_topics", True)
|
||||
self.use_old_topics = self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||
|
||||
##################################################
|
||||
# Old topics
|
||||
|
||||
if self.use_old_topics:
|
||||
# Anchor topics
|
||||
self.anchor_sub = self.create_subscription(
|
||||
String, "/anchor/arm/feedback", self.anchor_feedback, 10
|
||||
)
|
||||
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
|
||||
|
||||
# Create publishers
|
||||
self.socket_pub = self.create_publisher(
|
||||
SocketFeedback, "/arm/feedback/socket", 10
|
||||
)
|
||||
self.arm_feedback = SocketFeedback()
|
||||
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
||||
self.digit_feedback = DigitFeedback()
|
||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||
|
||||
# Create subscribers
|
||||
self.man_sub = self.create_subscription(
|
||||
ArmManual, "/arm/control/manual", self.send_manual, 10
|
||||
)
|
||||
|
||||
###################################################
|
||||
# New topics
|
||||
|
||||
# Anchor topics
|
||||
|
||||
# from_vic
|
||||
self.anchor_fromvic_sub_ = self.create_subscription(
|
||||
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
|
||||
)
|
||||
# to_vic
|
||||
self.anchor_tovic_pub_ = self.create_publisher(
|
||||
VicCAN, "/anchor/to_vic/relay", 20
|
||||
)
|
||||
|
||||
self.anchor_sub = self.create_subscription(
|
||||
String, "/anchor/arm/feedback", self.anchor_feedback, 10
|
||||
)
|
||||
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
|
||||
|
||||
# Control
|
||||
|
||||
# Manual: who tf knows. Maybe JointJog?
|
||||
# Manual: /arm/manual_new is published by Servo or Basestation
|
||||
self.jointjog_pub_ = self.create_subscription(
|
||||
JointJog, "/arm/manual_new", self.jointjog_callback, qos_profile=control_qos
|
||||
)
|
||||
# IK: /joint_commands is published by JointTrajectoryController via topic_based_control
|
||||
self.joint_command_sub_ = self.create_subscription(
|
||||
JointState, "/joint_commands", self.joint_command_callback, 1
|
||||
JointState, "/joint_commands", self.joint_command_callback, qos_profile=control_qos
|
||||
)
|
||||
|
||||
# Feedback
|
||||
|
||||
# Combined Socket and Digit feedback
|
||||
self.arm_feedback_pub_ = self.create_publisher(
|
||||
ArmFeedback,
|
||||
"/arm/feedback/new_feedback",
|
||||
qos_profile=qos.qos_profile_sensor_data,
|
||||
)
|
||||
self.arm_feedback_new = ArmFeedback()
|
||||
# IK: /joint_states is published from here to topic_based_control
|
||||
# IK arm pose: /joint_states is published from here to topic_based_control
|
||||
self.joint_state_pub_ = self.create_publisher(
|
||||
JointState, "joint_states", qos_profile=qos.qos_profile_sensor_data
|
||||
)
|
||||
|
||||
###################################################
|
||||
# Saved state
|
||||
|
||||
# Combined Socket and Digit feedback
|
||||
self.arm_feedback_new = ArmFeedback()
|
||||
|
||||
# IK Arm pose
|
||||
self.saved_joint_state = JointState()
|
||||
self.saved_joint_state.name = [
|
||||
"axis_0_joint",
|
||||
"axis_1_joint",
|
||||
"axis_2_joint",
|
||||
"axis_3_joint",
|
||||
"wrist_yaw_joint",
|
||||
"wrist_roll_joint",
|
||||
"ef_gripper_left_joint",
|
||||
self.saved_joint_state.name = self.all_joint_names
|
||||
# ... initialize with zeros
|
||||
self.saved_joint_state.position = [0.0] * len(self.saved_joint_state.name)
|
||||
self.saved_joint_state.velocity = [0.0] * len(self.saved_joint_state.name)
|
||||
|
||||
def jointjog_callback(self, msg: JointJog):
|
||||
if (
|
||||
len(msg.joint_names) == 0
|
||||
or len(msg.velocities) == 0
|
||||
or len(msg.joint_names) != len(msg.velocities)
|
||||
):
|
||||
return # Malformed message
|
||||
|
||||
# Grab velocities from message
|
||||
velocities = [
|
||||
(
|
||||
msg.velocities[msg.joint_names.index(joint_name)]
|
||||
if joint_name in msg.joint_names
|
||||
else 0.0
|
||||
)
|
||||
for joint_name in self.all_joint_names
|
||||
]
|
||||
self.saved_joint_state.position = [0.0] * len(
|
||||
self.saved_joint_state.name
|
||||
) # Initialize with zeros
|
||||
self.saved_joint_state.velocity = [0.0] * len(
|
||||
self.saved_joint_state.name
|
||||
) # Initialize with zeros
|
||||
# Deadzone
|
||||
velocities = [vel if abs(vel) > 0.05 else 0.0 for vel in velocities]
|
||||
# ROS2's rad/s to VicCAN's deg/s*10; don't convert gripper's m/s
|
||||
velocities = [
|
||||
math.degrees(vel) * 10 if i < 6 else vel for i, vel in enumerate(velocities)
|
||||
]
|
||||
# Axis 2 & 3 URDF direction is inverted
|
||||
velocities[2] = -velocities[2]
|
||||
velocities[3] = -velocities[3]
|
||||
|
||||
# Old
|
||||
|
||||
# Create publishers
|
||||
self.socket_pub = self.create_publisher(
|
||||
SocketFeedback, "/arm/feedback/socket", 10
|
||||
# Send Axis 0-3
|
||||
self.anchor_tovic_pub_.publish(
|
||||
VicCAN(
|
||||
mcu_name="arm", command_id=43, data=velocities[0:3], header=msg.header
|
||||
)
|
||||
)
|
||||
self.arm_feedback = SocketFeedback()
|
||||
self.digit_pub = self.create_publisher(DigitFeedback, "/arm/feedback/digit", 10)
|
||||
self.digit_feedback = DigitFeedback()
|
||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||
|
||||
# Create subscribers
|
||||
self.man_sub = self.create_subscription(
|
||||
ArmManual, "/arm/control/manual", self.send_manual, 10
|
||||
# Send Wrist yaw and roll
|
||||
# TODO: Verify embedded
|
||||
self.anchor_tovic_pub_.publish(
|
||||
VicCAN(
|
||||
mcu_name="digit", command_id=43, data=velocities[4:5], header=msg.header
|
||||
)
|
||||
)
|
||||
|
||||
def run(self):
|
||||
global thread
|
||||
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
thread.join()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
# Send End Effector Gripper
|
||||
# TODO: Verify m/s received correctly by embedded
|
||||
self.anchor_tovic_pub_.publish(
|
||||
VicCAN(
|
||||
mcu_name="digit", command_id=26, data=[velocities[6]], header=msg.header
|
||||
)
|
||||
)
|
||||
# TODO: use msg.duration
|
||||
|
||||
def joint_command_callback(self, msg: JointState):
|
||||
if len(msg.position) < 7 and len(msg.velocity) < 7:
|
||||
@@ -153,6 +219,7 @@ class ArmNode(Node):
|
||||
self.anchor_tovic_pub_.publish(digit_cmd)
|
||||
|
||||
def send_manual(self, msg: ArmManual):
|
||||
"""TODO: Old"""
|
||||
axis0 = msg.axis0
|
||||
axis1 = -1 * msg.axis1
|
||||
axis2 = msg.axis2
|
||||
@@ -166,7 +233,7 @@ class ArmNode(Node):
|
||||
|
||||
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
|
||||
|
||||
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
|
||||
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
|
||||
|
||||
command += f"can_relay_tovic,digit,28,{msg.laser}\n"
|
||||
|
||||
@@ -177,10 +244,12 @@ class ArmNode(Node):
|
||||
return
|
||||
|
||||
def send_cmd(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
output = String(data=msg)
|
||||
self.anchor_pub.publish(output)
|
||||
|
||||
def anchor_feedback(self, msg: String):
|
||||
"""TODO: Old"""
|
||||
output = msg.data
|
||||
if output.startswith("can_relay_fromvic,arm,55"):
|
||||
self.updateAngleFeedback(output)
|
||||
@@ -217,8 +286,8 @@ class ArmNode(Node):
|
||||
return
|
||||
|
||||
# Check message len to prevent crashing on bad data
|
||||
if msg.command_id in viccan_socket_msg_len_dict:
|
||||
expected_len = viccan_socket_msg_len_dict[msg.command_id]
|
||||
if msg.command_id in self.viccan_socket_msg_len_dict:
|
||||
expected_len = self.viccan_socket_msg_len_dict[msg.command_id]
|
||||
if len(msg.data) != expected_len:
|
||||
self.get_logger().warning(
|
||||
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
||||
@@ -308,8 +377,8 @@ class ArmNode(Node):
|
||||
return
|
||||
|
||||
# Check message len to prevent crashing on bad data
|
||||
if msg.command_id in viccan_digit_msg_len_dict:
|
||||
expected_len = viccan_digit_msg_len_dict[msg.command_id]
|
||||
if msg.command_id in self.viccan_digit_msg_len_dict:
|
||||
expected_len = self.viccan_digit_msg_len_dict[msg.command_id]
|
||||
if len(msg.data) != expected_len:
|
||||
self.get_logger().warning(
|
||||
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
|
||||
@@ -330,10 +399,12 @@ class ArmNode(Node):
|
||||
) # Wrist yaw
|
||||
|
||||
def publish_feedback(self):
|
||||
"""TODO: Old"""
|
||||
self.socket_pub.publish(self.arm_feedback)
|
||||
self.digit_pub.publish(self.digit_feedback)
|
||||
|
||||
def updateAngleFeedback(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
# Angle feedbacks,
|
||||
# split the msg.data by commas
|
||||
parts = msg.split(",")
|
||||
@@ -352,6 +423,7 @@ class ArmNode(Node):
|
||||
self.get_logger().info("Invalid angle feedback input format")
|
||||
|
||||
def updateBusVoltage(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
# Bus Voltage feedbacks
|
||||
parts = msg.split(",")
|
||||
if len(parts) >= 7:
|
||||
@@ -366,6 +438,7 @@ class ArmNode(Node):
|
||||
self.get_logger().info("Invalid voltage feedback input format")
|
||||
|
||||
def updateMotorFeedback(self, msg: str):
|
||||
"""TODO: Old"""
|
||||
parts = str(msg.strip()).split(",")
|
||||
motorId = round(float(parts[3]))
|
||||
temp = float(parts[4]) / 10.0
|
||||
@@ -393,8 +466,15 @@ def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
||||
arm_node = ArmNode()
|
||||
arm_node.run()
|
||||
rclpy.try_shutdown()
|
||||
thread = threading.Thread(target=rclpy.spin, args=(arm_node,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
thread.join()
|
||||
except (KeyboardInterrupt, ExternalShutdownException):
|
||||
pass
|
||||
finally:
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -33,12 +33,12 @@ CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
|
||||
control_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT, # Best Effort subscribers are still compatible with Reliable publishers
|
||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||
deadline=Duration(seconds=1),
|
||||
lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
liveliness_lease_duration=Duration(seconds=5),
|
||||
# deadline=Duration(seconds=1),
|
||||
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
# liveliness_lease_duration=Duration(seconds=5),
|
||||
)
|
||||
|
||||
# Used to verify the length of an incoming VicCAN feedback message
|
||||
|
||||
@@ -41,31 +41,38 @@ control_qos = qos.QoSProfile(
|
||||
depth=2,
|
||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||
deadline=Duration(seconds=1),
|
||||
lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
liveliness_lease_duration=Duration(seconds=5),
|
||||
# deadline=Duration(seconds=1),
|
||||
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
|
||||
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||
# liveliness_lease_duration=Duration(seconds=5),
|
||||
)
|
||||
|
||||
arm_ik_qos = qos.QoSProfile(
|
||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||
)
|
||||
|
||||
CORE_MODE = "twist" # "twist" or "duty"
|
||||
|
||||
STICK_DEADZONE = float(os.getenv("STICK_DEADZONE", "0.05"))
|
||||
|
||||
|
||||
class Headless(Node):
|
||||
# Every non-fixed joint defined in Arm's URDF
|
||||
# Used for JointState and JointJog messsages
|
||||
all_joint_names = [
|
||||
"axis_0_joint",
|
||||
"axis_1_joint",
|
||||
"axis_2_joint",
|
||||
"axis_3_joint",
|
||||
"wrist_yaw_joint",
|
||||
"wrist_roll_joint",
|
||||
"ef_gripper_left_joint",
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
# Initialize pygame first
|
||||
pygame.init()
|
||||
pygame.joystick.init()
|
||||
super().__init__("headless")
|
||||
|
||||
##################################################
|
||||
# Preamble
|
||||
|
||||
# Wait for anchor to start
|
||||
pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
|
||||
while len(pub_info) == 0:
|
||||
@@ -116,30 +123,23 @@ class Headless(Node):
|
||||
break
|
||||
id += 1
|
||||
|
||||
self.create_timer(0.1, self.send_controls)
|
||||
##################################################
|
||||
# Parameters
|
||||
|
||||
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
||||
self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
|
||||
self.arm_ik_twist_publisher = self.create_publisher(
|
||||
TwistStamped, "/servo_node/delta_twist_cmds", arm_ik_qos
|
||||
)
|
||||
self.arm_ik_jointjog_publisher = self.create_publisher(
|
||||
JointJog, "/servo_node/delta_joint_cmds", arm_ik_qos
|
||||
)
|
||||
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
|
||||
|
||||
self.core_twist_pub_ = self.create_publisher(
|
||||
Twist, "/core/twist", qos_profile=control_qos
|
||||
)
|
||||
self.core_state_pub_ = self.create_publisher(
|
||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||
self.declare_parameter("use_old_topics", True)
|
||||
self.use_old_topics = (
|
||||
self.get_parameter("use_old_topics").get_parameter_value().bool_value
|
||||
)
|
||||
|
||||
self.declare_parameter("arm_mode", "manual")
|
||||
self.arm_mode = self.get_parameter("arm_mode").value
|
||||
self.arm_mode = (
|
||||
self.get_parameter("arm_mode").get_parameter_value().string_value
|
||||
)
|
||||
|
||||
self.declare_parameter("arm_manual_scheme", "old")
|
||||
self.arm_manual_scheme = self.get_parameter("arm_manual_scheme").value
|
||||
self.arm_manual_scheme = (
|
||||
self.get_parameter("arm_manual_scheme").get_parameter_value().string_value
|
||||
)
|
||||
|
||||
# Check parameter validity
|
||||
if self.arm_mode not in ["manual", "ik"]:
|
||||
@@ -151,6 +151,49 @@ class Headless(Node):
|
||||
f"Invalid value '{self.arm_manual_scheme}' for arm_manual_scheme parameter. Defaulting to 'old' ('24 and '25 controls)."
|
||||
)
|
||||
|
||||
self.ctrl_mode = "core" # Start in core mode
|
||||
self.core_brake_mode = False
|
||||
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
|
||||
|
||||
##################################################
|
||||
# Old Topics
|
||||
|
||||
if self.use_old_topics:
|
||||
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
||||
self.arm_publisher = self.create_publisher(
|
||||
ArmManual, "/arm/control/manual", 2
|
||||
)
|
||||
self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
|
||||
|
||||
##################################################
|
||||
# New Topics
|
||||
|
||||
self.core_twist_pub_ = self.create_publisher(
|
||||
Twist, "/core/twist", qos_profile=control_qos
|
||||
)
|
||||
self.core_state_pub_ = self.create_publisher(
|
||||
CoreCtrlState, "/core/control/state", qos_profile=control_qos
|
||||
)
|
||||
|
||||
self.arm_manual_pub_ = self.create_publisher(
|
||||
JointJog, "/arm/manual_new", qos_profile=control_qos
|
||||
)
|
||||
|
||||
self.arm_ik_twist_publisher = self.create_publisher(
|
||||
TwistStamped, "/servo_node/delta_twist_cmds", qos_profile=control_qos
|
||||
)
|
||||
self.arm_ik_jointjog_publisher = self.create_publisher(
|
||||
JointJog, "/servo_node/delta_joint_cmds", qos_profile=control_qos
|
||||
)
|
||||
|
||||
##################################################
|
||||
# Timers
|
||||
|
||||
self.create_timer(0.1, self.send_controls)
|
||||
|
||||
##################################################
|
||||
# Services
|
||||
|
||||
# If using IK control, we have to "start" the servo node to enable it to accept commands
|
||||
self.servo_start_client = None
|
||||
if self.arm_mode == "ik":
|
||||
@@ -170,13 +213,17 @@ class Headless(Node):
|
||||
if self.servo_start_client.service_is_ready():
|
||||
self.servo_start_client.call_async(Trigger.Request())
|
||||
|
||||
self.ctrl_mode = "core" # Start in core mode
|
||||
self.core_brake_mode = False
|
||||
self.core_max_duty = 0.5 # Default max duty cycle (walking speed)
|
||||
|
||||
# Rumble when node is ready (returns False if rumble not supported)
|
||||
self.gamepad.rumble(0.7, 0.8, 150)
|
||||
|
||||
def stop_all(self):
|
||||
if self.use_old_topics:
|
||||
self.core_publisher.publish(CORE_STOP_MSG)
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
else:
|
||||
self.core_twist_pub_.publish(CORE_STOP_TWIST_MSG)
|
||||
|
||||
def send_controls(self):
|
||||
"""Read the gamepad state and publish control messages"""
|
||||
for event in pygame.event.get():
|
||||
@@ -187,10 +234,8 @@ class Headless(Node):
|
||||
# Check if controller is still connected
|
||||
if pygame.joystick.get_count() != self.num_gamepads:
|
||||
print("Gamepad disconnected. Exiting...")
|
||||
# Send one last zero control message
|
||||
self.core_publisher.publish(CORE_STOP_MSG)
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
# Stop the rover if controller disconnected
|
||||
self.stop_all()
|
||||
self.get_logger().info("Final stop commands sent. Shutting down.")
|
||||
# Clean up
|
||||
pygame.quit()
|
||||
@@ -206,23 +251,42 @@ class Headless(Node):
|
||||
new_ctrl_mode = "core"
|
||||
|
||||
if new_ctrl_mode != self.ctrl_mode:
|
||||
self.core_publisher.publish(CORE_STOP_MSG)
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
self.bio_publisher.publish(BIO_STOP_MSG)
|
||||
self.stop_all()
|
||||
self.gamepad.rumble(0.6, 0.7, 75)
|
||||
self.ctrl_mode = new_ctrl_mode
|
||||
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
|
||||
|
||||
# CORE
|
||||
if self.ctrl_mode == "core" and CORE_MODE == "duty":
|
||||
# Actually send the controls
|
||||
if self.ctrl_mode == "core":
|
||||
self.send_core()
|
||||
if self.use_old_topics:
|
||||
self.arm_publisher.publish(ARM_STOP_MSG)
|
||||
else:
|
||||
self.send_arm()
|
||||
# self.send_bio()
|
||||
if self.use_old_topics:
|
||||
self.core_publisher.publish(CORE_STOP_MSG)
|
||||
|
||||
def send_core(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
button_y = self.gamepad.get_button(3)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
if self.use_old_topics:
|
||||
input = CoreControl()
|
||||
input.max_speed = 90
|
||||
|
||||
# Collect controller state
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
|
||||
# Right wheels
|
||||
input.right_stick = float(round(-1 * right_stick_y, 2))
|
||||
|
||||
@@ -238,16 +302,9 @@ class Headless(Node):
|
||||
|
||||
self.core_publisher.publish(input)
|
||||
|
||||
elif self.ctrl_mode == "core" and CORE_MODE == "twist":
|
||||
else: # New topics
|
||||
input = Twist()
|
||||
|
||||
# Collect controller state
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
|
||||
# Forward/back and Turn
|
||||
input.linear.x = -1.0 * left_stick_y
|
||||
input.angular.z = -1.0 * copysign(
|
||||
@@ -280,30 +337,33 @@ class Headless(Node):
|
||||
state_msg = CoreCtrlState()
|
||||
state_msg.brake_mode = bool(self.core_brake_mode)
|
||||
state_msg.max_duty = float(self.core_max_duty)
|
||||
|
||||
self.core_state_pub_.publish(state_msg)
|
||||
self.get_logger().info(
|
||||
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
|
||||
)
|
||||
|
||||
# ARM and BIO
|
||||
if self.ctrl_mode == "arm" and self.arm_mode == "manual":
|
||||
def send_arm(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
button_y = self.gamepad.get_button(3)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
# OLD MANUAL
|
||||
if self.arm_mode == "manual" and self.use_old_topics:
|
||||
arm_input = ArmManual()
|
||||
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
button_y = self.gamepad.get_button(3)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
# OLD ARM CONTROL SCHEME
|
||||
if self.arm_manual_scheme == "old":
|
||||
# EF Grippers
|
||||
if left_trigger > 0 and right_trigger > 0:
|
||||
@@ -347,6 +407,7 @@ class Headless(Node):
|
||||
if abs(right_stick_y) > 0.15:
|
||||
arm_input.axis3 = -1 * round(right_stick_y)
|
||||
|
||||
# NEW ARM CONTROL SCHEME
|
||||
if self.arm_manual_scheme == "new":
|
||||
# Right stick: EF yaw and axis 3
|
||||
# Left stick: axis 1 and 2
|
||||
@@ -408,23 +469,71 @@ class Headless(Node):
|
||||
arm_input.linear_actuator = -1
|
||||
else:
|
||||
arm_input.linear_actuator = 0
|
||||
|
||||
|
||||
# BIO
|
||||
bio_input = BioControl(
|
||||
bio_arm=int(left_stick_y * -100),
|
||||
drill_arm=int(round(right_stick_y) * -100),
|
||||
)
|
||||
|
||||
# Drill motor (FAERIE)
|
||||
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
|
||||
bio_input.drill = int(
|
||||
30 * (right_trigger - left_trigger)
|
||||
) # Max duty cycle 30%
|
||||
|
||||
self.arm_publisher.publish(arm_input)
|
||||
|
||||
if self.ctrl_mode == "arm" and self.arm_mode == "ik":
|
||||
# NEW MANUAL
|
||||
elif self.arm_mode == "manual" and not self.use_old_topics:
|
||||
arm_input = JointJog()
|
||||
arm_input.header.frame_id = "base_link"
|
||||
arm_input.header.stamp = self.get_clock().now().to_msg()
|
||||
arm_input.joint_names = self.all_joint_names
|
||||
arm_input.velocities = [0.0] * len(self.all_joint_names)
|
||||
|
||||
# Right stick: EF yaw and axis 3
|
||||
# Left stick: axis 1 and 2
|
||||
# D-pad: axis 0 and _
|
||||
# Triggers: EF grippers
|
||||
# Bumpers: EF roll
|
||||
# A: brake
|
||||
# B: linear actuator in
|
||||
# X: _
|
||||
# Y: linear actuator out
|
||||
|
||||
ARM_THRESHOLD = 0.2
|
||||
|
||||
# Right stick: EF yaw and axis 3
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_yaw_joint")] = (
|
||||
float(copysign(1, right_stick_x)) if abs(right_stick_x) >= ARM_THRESHOLD else 0.0
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_3_joint")] = (
|
||||
float(-1 * copysign(1, right_stick_y)) if abs(right_stick_y) >= ARM_THRESHOLD else 0.0
|
||||
)
|
||||
|
||||
# Left stick: axis 1 and 2
|
||||
arm_input.velocities[self.all_joint_names.index("axis_1_joint")] = (
|
||||
float(copysign(1, left_stick_x)) if abs(left_stick_x) >= ARM_THRESHOLD else 0.0
|
||||
)
|
||||
arm_input.velocities[self.all_joint_names.index("axis_2_joint")] = (
|
||||
float(-1 * copysign(1, left_stick_y)) if abs(left_stick_y) >= ARM_THRESHOLD else 0.0
|
||||
)
|
||||
|
||||
# D-pad: axis 0 and _
|
||||
arm_input.velocities[self.all_joint_names.index("axis_0_joint")] = (
|
||||
float(copysign(1, dpad_input[0])) if dpad_input[0] != 0 else 0.0
|
||||
)
|
||||
|
||||
# Triggers: EF Grippers
|
||||
if left_trigger > 0 and right_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 0.0
|
||||
elif left_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = -1.0
|
||||
elif right_trigger > 0:
|
||||
arm_input.velocities[self.all_joint_names.index("ef_gripper_left_joint")] = 1.0
|
||||
|
||||
# Bumpers: EF roll
|
||||
arm_input.velocities[self.all_joint_names.index("wrist_roll_joint")] = right_bumper - left_bumper
|
||||
|
||||
# A: brake
|
||||
# TODO: Brake mode
|
||||
|
||||
# Y: linear actuator
|
||||
# TODO: linear actuator
|
||||
|
||||
self.arm_manual_pub_.publish(arm_input)
|
||||
|
||||
# IK
|
||||
elif self.arm_mode == "ik":
|
||||
arm_twist = TwistStamped()
|
||||
arm_twist.header.frame_id = "base_link"
|
||||
arm_twist.header.stamp = self.get_clock().now().to_msg()
|
||||
@@ -432,21 +541,6 @@ class Headless(Node):
|
||||
arm_jointjog.header.frame_id = "base_link"
|
||||
arm_jointjog.header.stamp = self.get_clock().now().to_msg()
|
||||
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
button_y = self.gamepad.get_button(3)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
# Right stick: linear y and linear x
|
||||
# Left stick: angular z and linear z
|
||||
# D-pad: angular y and _
|
||||
@@ -473,9 +567,7 @@ class Headless(Node):
|
||||
|
||||
# Triggers: EF Grippers
|
||||
if left_trigger > 0 or right_trigger > 0:
|
||||
arm_jointjog.joint_names.append(
|
||||
"ef_gripper_left_joint"
|
||||
)
|
||||
arm_jointjog.joint_names.append("ef_gripper_left_joint")
|
||||
arm_jointjog.velocities.append(float(right_trigger - left_trigger))
|
||||
|
||||
# Bumpers: angular x
|
||||
@@ -489,6 +581,36 @@ class Headless(Node):
|
||||
self.arm_ik_twist_publisher.publish(arm_twist)
|
||||
# self.arm_ik_jointjog_publisher.publish(arm_jointjog) # TODO: Figure this shit out
|
||||
|
||||
def send_bio(self):
|
||||
# Collect controller state
|
||||
left_stick_x = deadzone(self.gamepad.get_axis(0))
|
||||
left_stick_y = deadzone(self.gamepad.get_axis(1))
|
||||
left_trigger = deadzone(self.gamepad.get_axis(2))
|
||||
right_stick_x = deadzone(self.gamepad.get_axis(3))
|
||||
right_stick_y = deadzone(self.gamepad.get_axis(4))
|
||||
right_trigger = deadzone(self.gamepad.get_axis(5))
|
||||
button_a = self.gamepad.get_button(0)
|
||||
button_b = self.gamepad.get_button(1)
|
||||
button_x = self.gamepad.get_button(2)
|
||||
button_y = self.gamepad.get_button(3)
|
||||
left_bumper = self.gamepad.get_button(4)
|
||||
right_bumper = self.gamepad.get_button(5)
|
||||
dpad_input = self.gamepad.get_hat(0)
|
||||
|
||||
if self.use_old_topics:
|
||||
bio_input = BioControl(
|
||||
bio_arm=int(left_stick_y * -100),
|
||||
drill_arm=int(round(right_stick_y) * -100),
|
||||
)
|
||||
|
||||
# Drill motor (FAERIE)
|
||||
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
|
||||
bio_input.drill = int(
|
||||
30 * (right_trigger - left_trigger)
|
||||
) # Max duty cycle 30%
|
||||
|
||||
self.bio_publisher.publish(bio_input)
|
||||
|
||||
|
||||
def deadzone(value: float, threshold=STICK_DEADZONE) -> float:
|
||||
"""Apply a deadzone to a joystick input so the motors don't sound angry"""
|
||||
|
||||
Reference in New Issue
Block a user