2 Commits

Author SHA1 Message Date
Tristan McGinnis
0d1fe4431d try new zoom cmd id 2025-05-27 19:09:29 -06:00
Tristan McGinnis
2294743cc0 zoom testing 2025-05-27 19:05:54 -06:00
13 changed files with 255 additions and 986 deletions

View File

@@ -7,7 +7,6 @@ import time
import atexit import atexit
import serial import serial
import os
import sys import sys
import threading import threading
import glob import glob
@@ -46,7 +45,6 @@ class SerialRelay(Node):
#(f"Checking port {port}...") #(f"Checking port {port}...")
ser.write(b"ping\n") ser.write(b"ping\n")
response = ser.read_until("\n") response = ser.read_until("\n")
ser.write(b"can_relay_mode,on\n")
# if pong is in response, then we are talking with the MCU # if pong is in response, then we are talking with the MCU
if b"pong" in response: if b"pong" in response:
@@ -90,12 +88,11 @@ class SerialRelay(Node):
#self.get_logger().info(f"[MCU] {output}") #self.get_logger().info(f"[MCU] {output}")
msg = String() msg = String()
msg.data = output msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"): if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg) self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
self.arm_pub.publish(msg) self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor elif output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
self.bio_pub.publish(msg) self.bio_pub.publish(msg)
# msg = String() # msg = String()
# msg.data = output # msg.data = output
@@ -106,19 +103,19 @@ class SerialRelay(Node):
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
exit(1) self.exit(1)
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") print(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
exit(1) self.exit(1)
except Exception as e: except Exception as e:
print(f"Exception: {e}") print(f"Exception: {e}")
# print("Closing serial port.") print("Closing serial port.")
# if self.ser.is_open: if self.ser.is_open:
# self.ser.close() self.ser.close()
# exit(1) self.exit(1)
def send_cmd(self, msg): def send_cmd(self, msg):
message = msg.data message = msg.data
@@ -150,6 +147,6 @@ def main(args=None):
serial_pub.run() serial_pub.run()
if __name__ == '__main__': if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()

View File

@@ -42,7 +42,7 @@ class Headless(Node):
# Depricated, kept temporarily for reference # Depricated, kept temporarily for reference
#self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) #self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
#self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0 self.laser_status = 0
@@ -81,14 +81,14 @@ class Headless(Node):
while rclpy.ok(): while rclpy.ok():
#Check the pico for updates #Check the pico for updates
#self.read_feedback() self.read_feedback()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}") print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0: while pygame.joystick.get_count() == 0:
#self.send_controls() #depricated, kept for reference temporarily #self.send_controls() #depricated, kept for reference temporarily
self.send_manual() self.send_manual()
#self.read_feedback() self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0) self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}") print(f"Gamepad reconnected: {self.gamepad.get_name()}")
@@ -142,7 +142,7 @@ class Headless(Node):
input.axis0 = -1 input.axis0 = -1
if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15: if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
input.axis1 = round(self.gamepad.get_axis(0)) input.axis1 = -1 * round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15: if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1)) input.axis2 = -1 * round(self.gamepad.get_axis(1))
@@ -268,6 +268,22 @@ class Headless(Node):
# pass # pass
def read_feedback(self, msg):
# Create a string message object
#msg = String()
# Set message data
#msg.data = output
# Publish data
#self.publisher.publish(msg.data)
print(f"[MCU] {msg.data}", end="")
#print(f"[Pico] Publishing: {msg}")

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
import serial import serial
import sys import sys
import threading import threading
@@ -14,34 +13,9 @@ from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback from ros2_interfaces_pkg.msg import SocketFeedback
from ros2_interfaces_pkg.msg import DigitFeedback from ros2_interfaces_pkg.msg import DigitFeedback
# IK-Related imports
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
# from ikpy.chain import Chain
# from ikpy.link import OriginLink, URDFLink
# #import pygame as pyg
# from scipy.spatial.transform import Rotation as R
from . import astra_arm
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
serial_pub = None serial_pub = None
thread = None thread = None
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
@@ -56,20 +30,17 @@ class SerialRelay(Node):
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10) self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback) self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
# Create subscribers # Create subscribers
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
# Topics used in anchor mode # Topics used in anchor mode
if self.launch_mode == 'anchor': if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.arm = astra_arm.Arm('arm12.urdf')
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback() self.digit_feedback = DigitFeedback()
@@ -85,7 +56,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1) ser = serial.Serial(port, 115200, timeout=1)
#print(f"Checking port {port}...") #print(f"Checking port {port}...")
ser.write(b"ping\n") ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU # if pong is in response, then we are talking with the MCU
if b"pong" in response: if b"pong" in response:
@@ -153,117 +124,57 @@ class SerialRelay(Node):
pass pass
def send_ik(self, msg): def send_ik(self, msg):
# Convert Vector3 to a NumPy array pass
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
# decrease input vector by 90%
input_raw = input_raw * 0.2
if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0: def send_manual(self, msg):
self.get_logger().info("No input, stopping arm.")
command = "can_relay_tovic,arm,39,0,0,0,0\n"
self.send_cmd(command)
return
# Debug output
tempMsg = String()
tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
#self.debug_pub.publish(tempMsg)
# Target position is current position + input vector
current_position = self.arm.get_position_vector()
target_position = current_position + input_raw
#Print for IK DEBUG
tempMsg = String()
# tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
self.ik_debug.publish(tempMsg)
self.get_logger().info(f"[IK] {tempMsg.data}")
# Debug output for current position
#tempMsg.data = "Current Position: " + str(current_position)
#self.debug_pub.publish(tempMsg)
# Debug output for target position
#tempMsg.data = "Target Position: " + str(target_position)
#self.debug_pub.publish(tempMsg)
# Perform IK with the target position
if self.arm.perform_ik(target_position, self.get_logger()):
# Send command to control
#command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
#self.send_cmd(command)
self.get_logger().info(f"IK Success: {target_position}")
self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
# tempMsg = String()
# tempMsg.data = "IK Success: " + str(target_position)
# #self.debug_pub.publish(tempMsg)
# tempMsg.data = "Sending: " + str(command)
#self.debug_pub.publish(tempMsg)
# Send the IK angles to the MCU
command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
# self.send_cmd(command)
# Manual control for Wrist/Effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
else:
self.get_logger().info("IK Fail")
self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
# tempMsg = String()
# tempMsg.data = "IK Fail"
#self.debug_pub.publish(tempMsg)
def send_manual(self, msg: ArmManual):
axis0 = msg.axis0 axis0 = msg.axis0
axis1 = -1 * msg.axis1 axis1 = -1 * msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
axis3 = msg.axis3 axis3 = msg.axis3
#Send controls for arm #Send controls for arm
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n" command = "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" #self.send_cmd(command)
#Send controls for end effector #Send controls for end effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n" command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n" command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n" command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
self.send_cmd(command) self.send_cmd(command)
#print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll
# ef_yaw = msg.effector_yaw
# gripper = msg.gripper
# actuator = msg.linear_actuator
# laser = msg.laser
# #Send controls for digit
# command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
return return
def send_cmd(self, msg: str): def send_cmd(self, msg):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
output = String() output = String()
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg.data}") self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg):
output = msg.data output = msg.data
if output.startswith("can_relay_fromvic,arm,55"): if output.startswith("can_relay_fromvic,arm,55"):
#pass #pass
@@ -272,6 +183,7 @@ class SerialRelay(Node):
#pass #pass
self.updateBusVoltage(output) self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
#pass
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,digit,54"): elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",") parts = msg.data.split(",")
@@ -293,17 +205,17 @@ class SerialRelay(Node):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback) self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str): def updateAngleFeedback(self, msg: String):
# Angle feedbacks, # Angle feedbacks,
# split the msg.data by commas #split the msg.data by commas
parts = msg.split(",") parts = msg.data.split(",")
if len(parts) >= 7: if len(parts) >= 7:
# Extract the angles from the string # Extract the angles from the string
angles_in = parts[3:7] angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0 # Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in] angles = [float(angle) / 10.0 for angle in angles_in]
# angles[0] = 0.0 #override axis0 to zero #angles[0] = 0.0 #override axis0 to zero
# #
# #
#THIS NEEDS TO BE REMOVED LATER #THIS NEEDS TO BE REMOVED LATER
@@ -315,7 +227,7 @@ class SerialRelay(Node):
# #
# #
# # Update the arm's current angles # # Update the arm's current angles
self.arm.update_angles(angles) #self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0] self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1] self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2] self.arm_feedback.axis2_angle = angles[2]
@@ -328,9 +240,10 @@ class SerialRelay(Node):
else: else:
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str):
def updateBusVoltage(self, msg: String):
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.data.split(",")
if len(parts) >= 7: if len(parts) >= 7:
# Extract the voltage from the string # Extract the voltage from the string
voltages_in = parts[3:7] voltages_in = parts[3:7]
@@ -341,29 +254,25 @@ class SerialRelay(Node):
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0 self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else: else:
self.get_logger().info("Invalid voltage feedback input format") self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg: str):
parts = str(msg.strip()).split(",") def updateMotorFeedback(self, msg):
motorId = round(float(parts[3])) # Motor voltage/current/temperature feedback
temp = float(parts[4]) / 10.0 return
voltage = float(parts[5]) / 10.0 # parts = msg.data.split(",")
current = float(parts[6]) / 10.0 # if len(parts) >= 7:
if motorId == 1: # # Extract the voltage/current/temperature from the string
self.arm_feedback.axis1_temp = temp # values_in = parts[3:7]
self.arm_feedback.axis1_voltage = voltage # # Convert the voltages to floats
self.arm_feedback.axis1_current = current # for i in range(4):
elif motorId == 2: # #update arm_feedback's axisX_temp for each axis0_temp, axis1_temp, etc...
self.arm_feedback.axis2_temp = temp # pass
self.arm_feedback.axis2_voltage = voltage
self.arm_feedback.axis2_current = current # # self.arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0)
elif motorId == 3: # # self.arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0)
self.arm_feedback.axis3_temp = temp # # self.arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0)
self.arm_feedback.axis3_voltage = voltage # else:
self.arm_feedback.axis3_current = current # self.get_logger().info("Invalid motor feedback input format")
elif motorId == 4:
self.arm_feedback.axis0_temp = temp
self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current
@staticmethod @staticmethod
@@ -373,11 +282,8 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up...") print("Cleaning up...")
try: if self.ser.is_open:
if self.ser.is_open: self.ser.close()
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
@@ -393,6 +299,6 @@ def main(args=None):
serial_pub.run() serial_pub.run()
if __name__ == '__main__': if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()

View File

@@ -1,145 +0,0 @@
import rclpy
from rclpy.node import Node
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
from ikpy.chain import Chain
from ikpy.link import OriginLink, URDFLink
#import pygame as pyg
from scipy.spatial.transform import Rotation as R
from geometry_msgs.msg import Vector3
# Misc
degree = pi / 180.0
def convert_angles(angles):
# Converts angles to the format used for the urdf (contains some dummy joints)
return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
class Arm:
def __init__(self, urdf_name):
self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
# URDF file path
self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
# IKpy Chain
self.chain = Chain.from_urdf_file(self.urdf)
# Arrays for joint states
# Some links in the URDF are static (non-joints), these will remain zero for IK
# Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector
self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.current_angles = self.zero_angles
self.last_angles = self.zero_angles
self.ik_angles = self.zero_angles
self.current_position: list[float] = []
self.target_position = [0.0, 0.0, 0.0]
self.target_orientation: list = [] # Effector orientation desired at target position.
# Generally orientation for the effector is modified manually by the operator.
# Might not need, copied over from state_publisher.py in ik_test
#self.step = 0.03 # Max movement increment
def perform_ik(self, target_position, logger):
self.target_position = target_position
# Update the target orientation to the current orientation
self.update_orientation()
# print(f"[IK FOR] Target Position: {self.target_position}")
try:
# print(f"[TRY] Current Angles: {self.current_angles}")
# print(f"[TRY] Target Position: {self.target_position}")
# print(f"[TRY] Target Orientation: {self.target_orientation}")
self.ik_angles = self.chain.inverse_kinematics(
target_position=self.target_position,
target_orientation=self.target_orientation,
initial_position=self.current_angles,
orientation_mode="all"
)
# Check if the solution is within the tolerance
fk_matrix = self.chain.forward_kinematics(self.ik_angles)
fk_position = fk_matrix[:3, 3] # type: ignore
# print(f"[TRY] FK Position for Solution: {fk_position}")
error = np.linalg.norm(target_position - fk_position)
if error > self.ik_tolerance:
logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
return False
else:
logger.info(f"IK Solution Found. Error: {error}")
return True
except Exception as e:
logger.info(f"IK failed for exception: {e}")
return False
# # Given the FK_Matix for the arm's current pose, update the orientation array
# def update_orientation(self, fk_matrix):
# self.target_orientation = fk_matrix[:3, :3]
# return
# def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist):
# self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0]
# return
# Get current orientation of the end effector and update target_orientation
def update_orientation(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Update target_orientation to the effector's current orientation
self.target_orientation = fk_matrix[:3, :3] # type: ignore
# Update current angles to those provided
# Resetting last_angles to the new angles
#
# Use: First call, or when angles are changed manually.
def reset_angles(self, angles):
# Update angles to the new angles
self.current_angles = convert_angles(angles)
self.last_angles = self.current_angles
# Update current angles to those provided
# Maintain previous angles in last_angles
#
# Use: Repeated calls during IK operation
def update_angles(self, angles):
# Update angles to the new angles
self.last_angles = self.current_angles
self.current_angles = convert_angles(angles)
# Get current X,Y,Z position of end effector
def get_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
return position
# Get current X,Y,Z position of end effector
def get_position_vector(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
# Return position as a NumPy array
return np.array(position)
def update_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix and update current pos
self.current_position = fk_matrix[:3, 3] # type: ignore

View File

@@ -1,6 +1,4 @@
from setuptools import find_packages, setup from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'arm_pkg' package_name = 'arm_pkg'
@@ -12,8 +10,6 @@ setup(
('share/ament_index/resource_index/packages', ('share/ament_index/resource_index/packages',
['resource/' + package_name]), ['resource/' + package_name]),
('share/' + package_name, ['package.xml']), ('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*')),
(os.path.join('share', package_name, 'urdf'), glob('urdf/*')),
], ],
install_requires=['setuptools'], install_requires=['setuptools'],
zip_safe=True, zip_safe=True,

View File

@@ -1,239 +0,0 @@
<?xml version="1.0" ?>
<!-- =================================================================================== -->
<!-- | This document was autogenerated by xacro from omni_description/robots/omnipointer_arm_only.urdf.xacro | -->
<!-- | EDITING THIS FILE BY HAND IS NOT RECOMMENDED | -->
<!-- =================================================================================== -->
<robot name="omnipointer" xmlns:xacro="http://ros.org/wiki/xacro">
<!--MX64trntbl + MX106: ? + 0.153 -->
<!-- singleaxismount + 2.5girder + singleaxismount + base + MX106: 0.007370876 + 0.0226796 + 0.007370876 + ? + 0.153 -->
<!-- frame106 + singleaxismount + adjustgirder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.110563 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 5girder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.0368544 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 2.5girder + singleaxismount + tip: 0.0907185 + 0.00737088 + 0.0226796 + 0.00737088 + ? -->
<material name="omni/Blue">
<color rgba="0 0 0.8 1"/>
</material>
<material name="omni/Red">
<color rgba="1 0 0 1"/>
</material>
<material name="omni/Green">
<color rgba="0 1 0 1"/>
</material>
<material name="omni/Yellow">
<color rgba="1 1 0 1"/>
</material>
<material name="omni/LightGrey">
<color rgba="0.6 0.6 0.6 1"/>
</material>
<material name="omni/DarkGrey">
<color rgba="0.4 0.4 0.4 1"/>
</material>
<!-- Now we can start using the macros xacro:included above to define the actual omnipointer -->
<!-- The first use of a macro. This one was defined in youbot_base/base.urdf.xacro above.
A macro like this will expand to a set of link and joint definitions, and to additional
Gazebo-related extensions (sensor plugins, etc). The macro takes an argument, name,
that equals "base", and uses it to generate names for its component links and joints
(e.g., base_link). The xacro:included origin block is also an argument to the macro. By convention,
the origin block defines where the component is w.r.t its parent (in this case the parent
is the world frame). For more, see http://www.ros.org/wiki/xacro -->
<!-- foot for arm-->
<link name="base_link">
<inertial>
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="10.0"/>
<inertia ixx="0.1" ixy="0" ixz="0" iyy="0.1" iyz="0" izz="0.1"/>
</inertial>
</link>
<!-- joint between base_link and arm_0_link -->
<joint name="arm_joint_0" type="fixed">
<origin rpy="0 0 0" xyz="0 0 0"/>
<parent link="base_link"/>
<child link="arm_link_0"/>
</joint>
<link name="arm_link_0">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
<material name="omni/LightGrey"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<mass value="0.2"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000267245666667" ixy="0" ixz="0" iyy="0.000267245666667" iyz="0" izz="0.000435483"/>
</inertial>
</link>
<joint name="arm_joint_1" type="revolute">
<parent link="arm_link_0"/>
<child link="arm_link_1"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-3.1415926535" upper="3.1415926535" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.0545"/>
<axis xyz="0 0 1"/>
</joint>
<link name="arm_link_1">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<mass value="0.190421352"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000279744834534" ixy="0" ixz="0" iyy="0.000265717763008" iyz="0" izz="6.53151584738e-05"/>
</inertial>
</link>
<joint name="arm_joint_2" type="revolute">
<parent link="arm_link_1"/>
<child link="arm_link_2"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.15"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_2">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
<material name="omni/Red"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<mass value="0.29302326"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00251484771035" ixy="0" ixz="0" iyy="0.00248474836108" iyz="0" izz="9.19936757328e-05"/>
</inertial>
</link>
<joint name="arm_joint_3" type="revolute">
<parent link="arm_link_2"/>
<child link="arm_link_3"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.471"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_3">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
<material name="omni/Yellow"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<mass value="0.21931466"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000791433503053" ixy="0" ixz="0" iyy="0.000768905501178" iyz="0" izz="6.88531064581e-05"/>
</inertial>
</link>
<joint name="arm_joint_4" type="revolute">
<parent link="arm_link_3"/>
<child link="arm_link_4"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.377"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_4">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
<material name="omni/Green"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_5" type="revolute">
<parent link="arm_link_4"/>
<child link="arm_link_5"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.132"/>
<axis xyz="1 0 0"/>
</joint>
<link name="arm_link_5">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_6" type="fixed">
<parent link="arm_link_5"/>
<child link="arm_link_6"/>
<origin rpy="0 0 0" xyz="0 0 0.248"/>
</joint>
<link name="arm_link_6">
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="1e-12"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="1e-12" ixy="0" ixz="0" iyy="1e-12" iyz="0" izz="1e-12"/>
</inertial>
</link>
<!-- END OF ARM LINKS/JOINTS -->
</robot>

View File

@@ -1,307 +0,0 @@
<robot name="robot">
<link name="base_footprint"></link>
<joint name="base_joint" type="fixed">
<parent link="base_footprint" />
<child link="base_link" />
<origin xyz="0 0 0.0004048057655643422" rpy="0 0 0" />
</joint>
<link name="base_link">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
<material name="base_link-material">
<color rgba="0 0.6038273388475408 1 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.17" ixy="0" ixz="0" iyy="0.17" iyz="0" izz="0.05" />
</inertial>
</link>
<joint name="Axis_0_Joint" type="revolute">
<parent link="base_link" />
<child link="Axis_0" />
<origin xyz="0 0 0.035" rpy="0 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.5" upper="2.5" velocity="0.5"/> </joint>
<link name="Axis_0">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
<material name="Axis_0-material">
<color rgba="0.3515325994898463 0.4735314961384573 0.9301108583738498 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_Joint" type="revolute">
<parent link="Axis_0" />
<child link="Axis_1" />
<origin xyz="0 0 0.11189588647115647" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.7" upper="1.7" velocity="0.5"/> </joint>
<link name="Axis_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
<material name="Axis_1-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_to_Segment_1" type="fixed">
<parent link="Axis_1" />
<child link="Segment_1" />
<origin xyz="0 0.2350831500270899 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
<material name="Segment_1-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_Joint" type="revolute">
<parent link="Segment_1" />
<child link="Axis_2" />
<origin xyz="0 -5.219894517229704e-17 0.2637568842473722" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.7" upper="2.7" velocity="0.5"/> </joint>
<link name="Axis_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
<material name="Axis_2-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_to_Segment_2" type="fixed">
<parent link="Axis_2" />
<child link="Segment_2" />
<origin xyz="0 0.19535682173790003 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
<material name="Segment_2-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_Joint" type="revolute">
<parent link="Segment_2" />
<child link="Axis_3" />
<origin xyz="0 -4.337792830220178e-17 0.199625776257357" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
<material name="Axis_3-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_to_Segment_3" type="fixed">
<parent link="Axis_3" />
<child link="Segment_3" />
<origin xyz="0 0.06725724726912972 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
<material name="Segment_3-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Wrist_Joint" type="revolute">
<parent link="Segment_3" />
<child link="Axis_4" />
<origin xyz="0 0 0.0655808825338593" rpy="0 1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_4">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
<material name="Axis_4-material">
<color rgba="0.23455058215026167 0.9301108583738498 0.21952619971859377 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Continuous_Joint" type="revolute">
<parent link="Axis_4" />
<child link="Axis_4_C" />
<origin xyz="0.0009533507860803557 0 0" rpy="0 -1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-3.14" upper="3.14" velocity="0.5"/>
</joint>
<link name="Axis_4_C">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
<material name="Axis_4_C-material">
<color rgba="0.006048833020386069 0.407240211891531 0.15592646369776456 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_4_C_to_Effector" type="fixed">
<parent link="Axis_4_C" />
<child link="Effector" />
<origin xyz="0 0 0.06478774571448076" rpy="0 0 0" />
</joint>
<link name="Effector">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.05 0.01 0.135" />
</geometry>
<material name="Effector-material">
<color rgba="0.2746773120495699 0.01680737574872402 0.5711248294565854 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.01 0.01 0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.16666666666666666" ixy="0" ixz="0" iyy="0.16666666666666666" iyz="0" izz="0.16666666666666666" />
</inertial>
</link>
</robot>

View File

@@ -3,7 +3,6 @@ from rclpy.node import Node
import serial import serial
import sys import sys
import threading import threading
import os
import glob import glob
import time import time
import atexit import atexit
@@ -145,11 +144,10 @@ class SerialRelay(Node):
# LSS (SCYTHE) # LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n" command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
#self.send_cmd(command) self.send_cmd(command)
# Vibration Motor # Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n" command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
#self.send_cmd(command) self.send_cmd(command)
# FAERIE Control Commands # FAERIE Control Commands
@@ -158,15 +156,16 @@ class SerialRelay(Node):
# To be reviewed before use# # To be reviewed before use#
# Laser # Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
#self.send_cmd(command) self.send_cmd(command)
# Drill (SCABBARD) # Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n" command = f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
#self.send_cmd(command) print(msg.drill)
self.send_cmd(command)
# Bio linear actuator # Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n" command = "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command) self.send_cmd(command)
@@ -183,7 +182,7 @@ class SerialRelay(Node):
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
#self.get_logger().info(f"[Bio Anchor] {msg.data}") self.get_logger().info(f"[Bio Anchor] {msg.data}")
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100 if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
@@ -203,11 +202,8 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up...") print("Cleaning up...")
try: if self.ser.is_open:
if self.ser.is_open: self.ser.close()
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
@@ -223,6 +219,6 @@ def main(args=None):
serial_pub.run() serial_pub.run()
if __name__ == '__main__': if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()

View File

@@ -20,86 +20,113 @@ os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
max_speed = 90 #Max speed as a duty cycle percentage (1-100) max_speed = 75 #Max speed as a duty cycle percentage (1-100)
class Headless(Node): class Headless(Node):
def __init__(self): def __init__(self):
# Initialize pygame first # Initalize node with name
super().__init__("core_headless")
self.create_timer(0.20, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
pygame.init() pygame.init()
# Initialize the gamepad module
pygame.joystick.init() pygame.joystick.init()
# Wait for a gamepad to be connected # Check if any gamepad is connected
self.gamepad = None if pygame.joystick.get_count() == 0:
print("Waiting for gamepad connection...") print("No gamepad found.")
while pygame.joystick.get_count() == 0: pygame.quit()
# Process any pygame events to keep it responsive exit()
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.QUIT: if event.type == pygame.QUIT:
pygame.quit() pygame.quit()
sys.exit(0) exit()
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...") # Initialize the first gamepad, print name to terminal
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0) self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}') print(f'Gamepad Found: {self.gamepad.get_name()}')
#
# Now initialize the ROS2 node #
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start() thread.start()
try: try:
while rclpy.ok(): while rclpy.ok():
#Check the pico for updates
self.send_controls() self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(0) sys.exit(0)
def send_controls(self): def send_controls(self):
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.QUIT: if event.type == pygame.QUIT:
pygame.quit() pygame.quit()
sys.exit(0) exit()
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl() input = CoreControl()
input.max_speed = max_speed input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
input.right_stick = round(self.gamepad.get_axis(4),2)#right y-axis
if self.gamepad.get_axis(5) > 0: if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick input.left_stick = input.right_stick
else: else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis input.left_stick = round(self.gamepad.get_axis(1),2)#lext y-axis
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}'
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input) if pygame.joystick.get_count() != 0:
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' #stop the rover if there is no controller connected
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
else:
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.get_logger().info(f"[Ctrl] Stopping. No Gamepad Connected. ")
self.publisher.publish(input)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
rclpy.shutdown() rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,6 +1,5 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy import qos
from std_srvs.srv import Empty from std_srvs.srv import Empty
import signal import signal
@@ -8,7 +7,6 @@ import time
import atexit import atexit
import serial import serial
import os
import sys import sys
import threading import threading
import glob import glob
@@ -20,17 +18,6 @@ from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None serial_pub = None
thread = None thread = None
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
@@ -182,7 +169,7 @@ class SerialRelay(Node):
# Convert the 0-1 range into a value in the right range. # Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan)) return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg: CoreControl): def send_controls(self, msg):
#can_relay_tovic,core,19, left_stick, right_stick #can_relay_tovic,core,19, left_stick, right_stick
if(msg.turn_to_enable): if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
@@ -195,14 +182,14 @@ class SerialRelay(Node):
self.send_cmd(command) self.send_cmd(command)
#print(f"[Sys] Relaying: {command}") #print(f"[Sys] Relaying: {command}")
def send_cmd(self, msg: str): def send_cmd(self, msg):
if self.launch_mode == 'anchor': if self.launch_mode == 'anchor':
#self.get_logger().info(f"[Core to Anchor Relay] {msg}") #self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String()#Convert to std_msg string output = String()#Convert to std_msg string
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == 'core': elif self.launch_mode == 'core':
self.get_logger().info(f"[Core to MCU] {msg.data}") self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
@@ -215,46 +202,25 @@ class SerialRelay(Node):
self.core_feedback.gps_long = float(parts[3]) self.core_feedback.gps_long = float(parts[3])
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count
self.core_feedback.gps_sats = round(float(parts[3])) self.core_feedback.gps_sats = round(float(parts[3]))
#if parts length is at least 5 then we should have altitude, this is a temporary check until fully implemented elif output.startswith("can_relay_fromvic,core,51"):#Gyro x,y,z
if len(parts) >= 5:
self.core_feedback.gps_alt = round(float(parts[4]), 2)
elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z
self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6])) self.core_feedback.imu_calib = round(float(parts[6]))
elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bno_accel.z = float(parts[5]) self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.orientation = float(parts[6]) self.core_feedback.orientation = float(parts[6]) / 10.0
elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback elif output.startswith("can_relay_fromvic,core,53"):#Rev motor feedback
motorId = round(float(parts[3])) pass
temp = float(parts[4]) / 10.0 #self.updateMotorFeedback(output)
voltage = float(parts[5]) / 10.0 elif output.startswith("can_relay_fromvic,core,54"):#bat, 12, 5, 3, Voltage readings * 100
current = float(parts[6]) / 10.0
if motorId == 1:
self.core_feedback.fl_temp = temp
self.core_feedback.fl_voltage = voltage
self.core_feedback.fl_current = current
elif motorId == 2:
self.core_feedback.bl_temp = temp
self.core_feedback.bl_voltage = voltage
self.core_feedback.bl_current = current
elif motorId == 3:
self.core_feedback.fr_temp = temp
self.core_feedback.fr_voltage = voltage
self.core_feedback.fr_current = current
elif motorId == 4:
self.core_feedback.br_temp = temp
self.core_feedback.br_voltage = voltage
self.core_feedback.br_current = current
elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100
self.core_feedback.bat_voltage = float(parts[3]) / 100.0 self.core_feedback.bat_voltage = float(parts[3]) / 100.0
self.core_feedback.voltage_12 = float(parts[4]) / 100.0 self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0 self.core_feedback.voltage_5 = float(parts[5]) / 100.0
self.core_feedback.voltage_3 = float(parts[6]) / 100.0 self.core_feedback.voltage_3 = float(parts[6]) / 100.0
elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure elif output.startswith("can_relay_fromvic,core,56"):#BMP Temp, Altitude, Pressure
self.core_feedback.bmp_temp = float(parts[3]) self.core_feedback.bmp_temp = float(parts[3])
self.core_feedback.bmp_alt = float(parts[4]) self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5]) self.core_feedback.bmp_pres = float(parts[5])
@@ -276,11 +242,8 @@ class SerialRelay(Node):
def cleanup(self): def cleanup(self):
print("Cleaning up before terminating...") print("Cleaning up before terminating...")
try: if self.ser.is_open:
if self.ser.is_open: self.ser.close()
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
@@ -299,7 +262,7 @@ def main(args=None):
serial_pub.run() serial_pub.run()
if __name__ == '__main__': if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main() main()

View File

@@ -42,6 +42,13 @@ class PtzNode(Node):
self.loop = None self.loop = None
self.thread_pool = None self.thread_pool = None
# Track current zoom level and zoom state
self.current_zoom_level = 1.0
self.max_zoom_level = 6.0 # A8 mini max zoom
self.zoom_step = 0.2 # Zoom step per command
self.zooming = 0 # Current zoom direction: -1=out, 0=stop, 1=in
self.zoom_timer = None
# Create publishers # Create publishers
self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10)
self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) self.debug_pub = self.create_publisher(String, '/ptz/debug', 10)
@@ -130,7 +137,6 @@ class PtzNode(Node):
debug_str += str(data) debug_str += str(data)
self.get_logger().debug(debug_str) self.get_logger().debug(debug_str)
def check_camera_connection(self): def check_camera_connection(self):
"""Periodically check camera connection and attempt to reconnect if needed.""" """Periodically check camera connection and attempt to reconnect if needed."""
if not self.camera_connected and not self.shutdown_requested: if not self.camera_connected and not self.shutdown_requested:
@@ -181,6 +187,10 @@ class PtzNode(Node):
if msg.reset: if msg.reset:
self.get_logger().info("Attempting to reset camera to center position") self.get_logger().info("Attempting to reset camera to center position")
await self.camera.send_attitude_angles_command(0.0, 0.0) await self.camera.send_attitude_angles_command(0.0, 0.0)
# Also reset zoom to 1x when resetting camera
self.current_zoom_level = 1.0
await self.camera.send_absolute_zoom_command(1.0)
return return
if msg.control_mode == 0: if msg.control_mode == 0:
@@ -201,10 +211,21 @@ class PtzNode(Node):
self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}") self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}")
await self.camera.send_single_axis_attitude_command(angle, axis) await self.camera.send_single_axis_attitude_command(angle, axis)
elif msg.control_mode == 3: elif msg.control_mode == 3:
zoom_level = msg.zoom_level # Instead of absolute zoom, interpret zoom_level as zoom direction
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x") zoom_direction = int(msg.zoom_level) if abs(msg.zoom_level) >= 0.5 else 0
await self.camera.send_absolute_zoom_command(zoom_level) zoom_direction = max(-1, min(1, zoom_direction)) # Restrict to -1, 0, 1
if zoom_direction != self.zooming:
self.zooming = zoom_direction
self.get_logger().debug(f"Zoom direction changed to {zoom_direction}")
if zoom_direction == 0:
# Stop zooming
self.get_logger().debug(f"Stopping zoom at level {self.current_zoom_level:.1f}x")
else:
# Start zooming in the specified direction
await self.perform_zoom(zoom_direction)
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'): if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
if msg.stream_type > 0 and msg.stream_freq >= 0: if msg.stream_type > 0 and msg.stream_freq >= 0:
@@ -221,13 +242,52 @@ class PtzNode(Node):
except RuntimeError as e: # Catch SDK's "not connected" errors except RuntimeError as e: # Catch SDK's "not connected" errors
self.get_logger().warning(f"SDK command failed (likely not connected): {e}") self.get_logger().warning(f"SDK command failed (likely not connected): {e}")
# self.camera_connected will be updated by health/connection checks # self.camera_connected will be updated by health/connection checks
# self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check
# self.feedback_pub.publish(self.feedback_msg)
except Exception as e: except Exception as e:
self.get_logger().error(f"Error processing control command: {e}") self.get_logger().error(f"Error processing control command: {e}")
self.feedback_msg.error_msg = f"Control error: {str(e)}" self.feedback_msg.error_msg = f"Control error: {str(e)}"
self.feedback_pub.publish(self.feedback_msg) # Publish for other errors self.feedback_pub.publish(self.feedback_msg) # Publish for other errors
async def perform_zoom(self, direction):
"""Perform a zoom operation in the specified direction."""
if not self.camera or not self.camera.is_connected:
return
if direction == 0:
return
# Calculate new zoom level
new_zoom_level = self.current_zoom_level + (direction * self.zoom_step)
# Clamp to valid range
new_zoom_level = max(1.0, min(self.max_zoom_level, new_zoom_level))
# Skip if no change (already at min/max)
if new_zoom_level == self.current_zoom_level:
if direction > 0:
self.get_logger().debug(f"Already at maximum zoom level ({self.max_zoom_level:.1f}x)")
else:
self.get_logger().debug(f"Already at minimum zoom level (1.0x)")
return
# Set the new zoom level
self.get_logger().debug(f"Zooming from {self.current_zoom_level:.1f}x to {new_zoom_level:.1f}x")
self.current_zoom_level = new_zoom_level
try:
await self.camera.send_absolute_zoom_command(new_zoom_level)
# If still zooming in same direction, schedule another zoom step
if self.zooming == direction:
# Schedule a new zoom step after a short delay if we're not at the limits
if (direction > 0 and new_zoom_level < self.max_zoom_level) or \
(direction < 0 and new_zoom_level > 1.0):
# Use create_timer for a non-blocking delay
await asyncio.sleep(0.2) # 200ms delay between zoom steps
if self.zooming == direction: # Check if direction hasn't changed
await self.perform_zoom(direction)
except Exception as e:
self.get_logger().error(f"Error during zoom operation: {e}")
def publish_debug(self, message_text): def publish_debug(self, message_text):
"""Publish debug message.""" """Publish debug message."""
msg = String() msg = String()
@@ -250,7 +310,6 @@ class PtzNode(Node):
self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.") self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.")
return None return None
async def shutdown_node_async(self): async def shutdown_node_async(self):
"""Perform clean shutdown of camera connection.""" """Perform clean shutdown of camera connection."""
self.shutdown_requested = True self.shutdown_requested = True

View File

@@ -64,7 +64,7 @@ class CommandID(Enum):
""" """
ROTATION_CONTROL = 0x07 ROTATION_CONTROL = 0x07
ABSOLUTE_ZOOM = 0x08 ABSOLUTE_ZOOM = 0x0C
ATTITUDE_ANGLES = 0x0E ATTITUDE_ANGLES = 0x0E
SINGLE_AXIS_CONTROL = 0x41 SINGLE_AXIS_CONTROL = 0x41
DATA_STREAM_REQUEST = 0x25 DATA_STREAM_REQUEST = 0x25