3 Commits

Author SHA1 Message Date
Tristan McGinnis
0d239698fa one more attempt at manual zoom 2025-05-29 18:51:41 -06:00
Tristan McGinnis
998f3af33a attempting manual zoom functionality 2025-05-29 18:39:06 -06:00
Tristan McGinnis
b4ef261737 Merge pull request #14 from SHC-ASTRA/main
merge main into ptz-zoom
2025-05-29 18:30:07 -06:00
12 changed files with 149 additions and 916 deletions

View File

@@ -7,7 +7,6 @@ import time
import atexit
import serial
import os
import sys
import threading
import glob
@@ -46,7 +45,6 @@ class SerialRelay(Node):
#(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
ser.write(b"can_relay_mode,on\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -90,7 +88,6 @@ class SerialRelay(Node):
#self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
@@ -106,19 +103,19 @@ class SerialRelay(Node):
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
self.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
exit(1)
self.exit(1)
except Exception as e:
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
def send_cmd(self, msg):
message = msg.data
@@ -150,6 +147,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -42,7 +42,7 @@ class Headless(Node):
# Depricated, kept temporarily for reference
#self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
#self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
self.laser_status = 0
@@ -81,14 +81,14 @@ class Headless(Node):
while rclpy.ok():
#Check the pico for updates
#self.read_feedback()
self.read_feedback()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
#self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
#self.read_feedback()
self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
@@ -142,7 +142,7 @@ class Headless(Node):
input.axis0 = -1
if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
input.axis1 = round(self.gamepad.get_axis(0))
input.axis1 = -1 * round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
@@ -268,6 +268,22 @@ class Headless(Node):
# pass
def read_feedback(self, msg):
# Create a string message object
#msg = String()
# Set message data
#msg.data = output
# Publish data
#self.publisher.publish(msg.data)
print(f"[MCU] {msg.data}", end="")
#print(f"[Pico] Publishing: {msg}")

View File

@@ -1,6 +1,5 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
import serial
import sys
import threading
@@ -14,34 +13,9 @@ from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback
from ros2_interfaces_pkg.msg import DigitFeedback
# IK-Related imports
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
# from ikpy.chain import Chain
# from ikpy.link import OriginLink, URDFLink
# #import pygame as pyg
# from scipy.spatial.transform import Rotation as R
from . import astra_arm
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initialize node
@@ -56,20 +30,17 @@ class SerialRelay(Node):
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
# Create subscribers
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
# Topics used in anchor mode
if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.arm = astra_arm.Arm('arm12.urdf')
self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
@@ -85,7 +56,7 @@ class SerialRelay(Node):
ser = serial.Serial(port, 115200, timeout=1)
#print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
@@ -153,104 +124,44 @@ class SerialRelay(Node):
pass
def send_ik(self, msg):
# Convert Vector3 to a NumPy array
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
# decrease input vector by 90%
input_raw = input_raw * 0.2
pass
if input_raw[0] == 0.0 and input_raw[1] == 0.0 and input_raw[2] == 0.0:
self.get_logger().info("No input, stopping arm.")
command = "can_relay_tovic,arm,39,0,0,0,0\n"
self.send_cmd(command)
return
# Debug output
tempMsg = String()
tempMsg.data = "From IK Control Got Vector: " + str(input_raw)
#self.debug_pub.publish(tempMsg)
# Target position is current position + input vector
current_position = self.arm.get_position_vector()
target_position = current_position + input_raw
#Print for IK DEBUG
tempMsg = String()
# tempMsg.data = "Current Position: " + str(current_position) + "\nInput Vector" + str(input_raw) + "\nTarget Position: " + str(target_position) + "\nAngles: " + str(self.arm.current_angles)
tempMsg.data = "Current Angles: " + str(math.degrees(self.arm.current_angles[2])) + ", " + str(math.degrees(self.arm.current_angles[4])) + ", " + str(math.degrees(self.arm.current_angles[6]))
self.ik_debug.publish(tempMsg)
self.get_logger().info(f"[IK] {tempMsg.data}")
# Debug output for current position
#tempMsg.data = "Current Position: " + str(current_position)
#self.debug_pub.publish(tempMsg)
# Debug output for target position
#tempMsg.data = "Target Position: " + str(target_position)
#self.debug_pub.publish(tempMsg)
# Perform IK with the target position
if self.arm.perform_ik(target_position, self.get_logger()):
# Send command to control
#command = "can_relay_tovic,arm,32," + ",".join(map(str, self.arm.ik_angles[:4])) + "\n"
#self.send_cmd(command)
self.get_logger().info(f"IK Success: {target_position}")
self.get_logger().info(f"IK Angles: [{str(round(math.degrees(self.arm.ik_angles[2]), 2))}, {str(round(math.degrees(self.arm.ik_angles[4]), 2))}, {str(round(math.degrees(self.arm.ik_angles[6]), 2))}]")
# tempMsg = String()
# tempMsg.data = "IK Success: " + str(target_position)
# #self.debug_pub.publish(tempMsg)
# tempMsg.data = "Sending: " + str(command)
#self.debug_pub.publish(tempMsg)
# Send the IK angles to the MCU
command = "can_relay_tovic,arm,32," + f"{math.degrees(self.arm.ik_angles[0])*10},{math.degrees(self.arm.ik_angles[2])*10},{math.degrees(self.arm.ik_angles[4])*10},{math.degrees(self.arm.ik_angles[6])*10}" + "\n"
# self.send_cmd(command)
# Manual control for Wrist/Effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
else:
self.get_logger().info("IK Fail")
self.get_logger().info(f"IK Angles: [{str(math.degrees(self.arm.ik_angles[2]))}, {str(math.degrees(self.arm.ik_angles[4]))}, {str(math.degrees(self.arm.ik_angles[6]))}]")
# tempMsg = String()
# tempMsg.data = "IK Fail"
#self.debug_pub.publish(tempMsg)
def send_manual(self, msg: ArmManual):
def send_manual(self, msg):
axis0 = msg.axis0
axis1 = -1 * msg.axis1
axis2 = msg.axis2
axis3 = msg.axis3
#Send controls for arm
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
command = "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
#self.send_cmd(command)
#Send controls for end effector
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
#self.send_cmd(command)
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
self.send_cmd(command)
#print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll
# ef_yaw = msg.effector_yaw
# gripper = msg.gripper
# actuator = msg.linear_actuator
# laser = msg.laser
# #Send controls for digit
# command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
return
@@ -260,7 +171,7 @@ class SerialRelay(Node):
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg.data}")
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String):
@@ -272,6 +183,7 @@ class SerialRelay(Node):
#pass
self.updateBusVoltage(output)
elif output.startswith("can_relay_fromvic,arm,53"):
#pass
self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",")
@@ -294,8 +206,8 @@ class SerialRelay(Node):
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: str):
# Angle feedbacks,
# split the msg.data by commas
# Angle feedbacks,
#split the msg.data by commas
parts = msg.split(",")
if len(parts) >= 7:
@@ -303,7 +215,7 @@ class SerialRelay(Node):
angles_in = parts[3:7]
# Convert the angles to floats divide by 10.0
angles = [float(angle) / 10.0 for angle in angles_in]
# angles[0] = 0.0 #override axis0 to zero
#angles[0] = 0.0 #override axis0 to zero
#
#
#THIS NEEDS TO BE REMOVED LATER
@@ -315,7 +227,7 @@ class SerialRelay(Node):
#
#
# # Update the arm's current angles
self.arm.update_angles(angles)
#self.arm.update_angles(angles)
self.arm_feedback.axis0_angle = angles[0]
self.arm_feedback.axis1_angle = angles[1]
self.arm_feedback.axis2_angle = angles[2]
@@ -328,6 +240,7 @@ class SerialRelay(Node):
else:
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.split(",")
@@ -341,29 +254,25 @@ class SerialRelay(Node):
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
else:
self.get_logger().info("Invalid voltage feedback input format")
def updateMotorFeedback(self, msg: str):
parts = str(msg.strip()).split(",")
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
if motorId == 1:
self.arm_feedback.axis1_temp = temp
self.arm_feedback.axis1_voltage = voltage
self.arm_feedback.axis1_current = current
elif motorId == 2:
self.arm_feedback.axis2_temp = temp
self.arm_feedback.axis2_voltage = voltage
self.arm_feedback.axis2_current = current
elif motorId == 3:
self.arm_feedback.axis3_temp = temp
self.arm_feedback.axis3_voltage = voltage
self.arm_feedback.axis3_current = current
elif motorId == 4:
self.arm_feedback.axis0_temp = temp
self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current
def updateMotorFeedback(self, msg):
# Motor voltage/current/temperature feedback
return
# parts = msg.data.split(",")
# if len(parts) >= 7:
# # Extract the voltage/current/temperature from the string
# values_in = parts[3:7]
# # Convert the voltages to floats
# for i in range(4):
# #update arm_feedback's axisX_temp for each axis0_temp, axis1_temp, etc...
# pass
# # self.arm_feedback.updateJointVoltages(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointCurrents(i, float(values_in[i]) / 10.0)
# # self.arm_feedback.updateJointTemperatures(i, float(values_in[i]) / 10.0)
# else:
# self.get_logger().info("Invalid motor feedback input format")
@staticmethod
@@ -373,11 +282,8 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -393,6 +299,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -1,145 +0,0 @@
import rclpy
from rclpy.node import Node
import numpy as np
import time, math, os
from math import sin, cos, pi
from ament_index_python.packages import get_package_share_directory
from ikpy.chain import Chain
from ikpy.link import OriginLink, URDFLink
#import pygame as pyg
from scipy.spatial.transform import Rotation as R
from geometry_msgs.msg import Vector3
# Misc
degree = pi / 180.0
def convert_angles(angles):
# Converts angles to the format used for the urdf (contains some dummy joints)
return [0.0, math.radians(angles[0]), math.radians(angles[1]), 0.0, math.radians(angles[2]), 0.0, math.radians(angles[3]), 0.0, math.radians(angles[4]), math.radians(angles[5]), 0.0]
class Arm:
def __init__(self, urdf_name):
self.ik_tolerance = 1e-1 #Tolerance (in meters) to determine if solution is valid
# URDF file path
self.urdf = os.path.join(get_package_share_directory('arm_pkg'), 'urdf', urdf_name)
# IKpy Chain
self.chain = Chain.from_urdf_file(self.urdf)
# Arrays for joint states
# Some links in the URDF are static (non-joints), these will remain zero for IK
# Indexes: Fixed_base, Ax_0, Ax_1, seg1, Ax_2, seg2, ax_3, seg3, continuous, wrist, Effector
self.zero_angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.current_angles = self.zero_angles
self.last_angles = self.zero_angles
self.ik_angles = self.zero_angles
self.current_position: list[float] = []
self.target_position = [0.0, 0.0, 0.0]
self.target_orientation: list = [] # Effector orientation desired at target position.
# Generally orientation for the effector is modified manually by the operator.
# Might not need, copied over from state_publisher.py in ik_test
#self.step = 0.03 # Max movement increment
def perform_ik(self, target_position, logger):
self.target_position = target_position
# Update the target orientation to the current orientation
self.update_orientation()
# print(f"[IK FOR] Target Position: {self.target_position}")
try:
# print(f"[TRY] Current Angles: {self.current_angles}")
# print(f"[TRY] Target Position: {self.target_position}")
# print(f"[TRY] Target Orientation: {self.target_orientation}")
self.ik_angles = self.chain.inverse_kinematics(
target_position=self.target_position,
target_orientation=self.target_orientation,
initial_position=self.current_angles,
orientation_mode="all"
)
# Check if the solution is within the tolerance
fk_matrix = self.chain.forward_kinematics(self.ik_angles)
fk_position = fk_matrix[:3, 3] # type: ignore
# print(f"[TRY] FK Position for Solution: {fk_position}")
error = np.linalg.norm(target_position - fk_position)
if error > self.ik_tolerance:
logger.info(f"No VALID IK Solution within tolerance. Error: {error}")
return False
else:
logger.info(f"IK Solution Found. Error: {error}")
return True
except Exception as e:
logger.info(f"IK failed for exception: {e}")
return False
# # Given the FK_Matix for the arm's current pose, update the orientation array
# def update_orientation(self, fk_matrix):
# self.target_orientation = fk_matrix[:3, :3]
# return
# def update_joints(self, ax_0, ax_1, ax_2, ax_3, wrist):
# self.current_angles = [0.0, 0.0, ax_0, ax_1, ax_2, ax_3, wrist, 0.0]
# return
# Get current orientation of the end effector and update target_orientation
def update_orientation(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Update target_orientation to the effector's current orientation
self.target_orientation = fk_matrix[:3, :3] # type: ignore
# Update current angles to those provided
# Resetting last_angles to the new angles
#
# Use: First call, or when angles are changed manually.
def reset_angles(self, angles):
# Update angles to the new angles
self.current_angles = convert_angles(angles)
self.last_angles = self.current_angles
# Update current angles to those provided
# Maintain previous angles in last_angles
#
# Use: Repeated calls during IK operation
def update_angles(self, angles):
# Update angles to the new angles
self.last_angles = self.current_angles
self.current_angles = convert_angles(angles)
# Get current X,Y,Z position of end effector
def get_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
return position
# Get current X,Y,Z position of end effector
def get_position_vector(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix
position = fk_matrix[:3, 3] # type: ignore
# Return position as a NumPy array
return np.array(position)
def update_position(self):
# FK matrix for arm's current pose
fk_matrix = self.chain.forward_kinematics(self.current_angles)
# Get the position of the end effector from the FK matrix and update current pos
self.current_position = fk_matrix[:3, 3] # type: ignore

View File

@@ -1,6 +1,4 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'arm_pkg'
@@ -12,8 +10,6 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*')),
(os.path.join('share', package_name, 'urdf'), glob('urdf/*')),
],
install_requires=['setuptools'],
zip_safe=True,

View File

@@ -1,239 +0,0 @@
<?xml version="1.0" ?>
<!-- =================================================================================== -->
<!-- | This document was autogenerated by xacro from omni_description/robots/omnipointer_arm_only.urdf.xacro | -->
<!-- | EDITING THIS FILE BY HAND IS NOT RECOMMENDED | -->
<!-- =================================================================================== -->
<robot name="omnipointer" xmlns:xacro="http://ros.org/wiki/xacro">
<!--MX64trntbl + MX106: ? + 0.153 -->
<!-- singleaxismount + 2.5girder + singleaxismount + base + MX106: 0.007370876 + 0.0226796 + 0.007370876 + ? + 0.153 -->
<!-- frame106 + singleaxismount + adjustgirder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.110563 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 5girder + singleaxismount + base + MX28: 0.0907185 + 0.00737088 + 0.0368544 + 0.00737088 + ? + 0.077 -->
<!-- frame28 + singleaxismount + 2.5girder + singleaxismount + tip: 0.0907185 + 0.00737088 + 0.0226796 + 0.00737088 + ? -->
<material name="omni/Blue">
<color rgba="0 0 0.8 1"/>
</material>
<material name="omni/Red">
<color rgba="1 0 0 1"/>
</material>
<material name="omni/Green">
<color rgba="0 1 0 1"/>
</material>
<material name="omni/Yellow">
<color rgba="1 1 0 1"/>
</material>
<material name="omni/LightGrey">
<color rgba="0.6 0.6 0.6 1"/>
</material>
<material name="omni/DarkGrey">
<color rgba="0.4 0.4 0.4 1"/>
</material>
<!-- Now we can start using the macros xacro:included above to define the actual omnipointer -->
<!-- The first use of a macro. This one was defined in youbot_base/base.urdf.xacro above.
A macro like this will expand to a set of link and joint definitions, and to additional
Gazebo-related extensions (sensor plugins, etc). The macro takes an argument, name,
that equals "base", and uses it to generate names for its component links and joints
(e.g., base_link). The xacro:included origin block is also an argument to the macro. By convention,
the origin block defines where the component is w.r.t its parent (in this case the parent
is the world frame). For more, see http://www.ros.org/wiki/xacro -->
<!-- foot for arm-->
<link name="base_link">
<inertial>
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="10.0"/>
<inertia ixx="0.1" ixy="0" ixz="0" iyy="0.1" iyz="0" izz="0.1"/>
</inertial>
</link>
<!-- joint between base_link and arm_0_link -->
<joint name="arm_joint_0" type="fixed">
<origin rpy="0 0 0" xyz="0 0 0"/>
<parent link="base_link"/>
<child link="arm_link_0"/>
</joint>
<link name="arm_link_0">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
<material name="omni/LightGrey"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<geometry>
<box size="0.1143 0.1143 0.0545"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.02725"/>
<mass value="0.2"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000267245666667" ixy="0" ixz="0" iyy="0.000267245666667" iyz="0" izz="0.000435483"/>
</inertial>
</link>
<joint name="arm_joint_1" type="revolute">
<parent link="arm_link_0"/>
<child link="arm_link_1"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-3.1415926535" upper="3.1415926535" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.0545"/>
<axis xyz="0 0 1"/>
</joint>
<link name="arm_link_1">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<geometry>
<box size="0.0402 0.05 0.15"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.075"/>
<mass value="0.190421352"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000279744834534" ixy="0" ixz="0" iyy="0.000265717763008" iyz="0" izz="6.53151584738e-05"/>
</inertial>
</link>
<joint name="arm_joint_2" type="revolute">
<parent link="arm_link_1"/>
<child link="arm_link_2"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.15"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_2">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
<material name="omni/Red"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<geometry>
<box size="0.0356 0.05 0.471"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.2355"/>
<mass value="0.29302326"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00251484771035" ixy="0" ixz="0" iyy="0.00248474836108" iyz="0" izz="9.19936757328e-05"/>
</inertial>
</link>
<joint name="arm_joint_3" type="revolute">
<parent link="arm_link_2"/>
<child link="arm_link_3"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.471"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_3">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
<material name="omni/Yellow"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<geometry>
<box size="0.0356 0.05 0.377"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.1885"/>
<mass value="0.21931466"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.000791433503053" ixy="0" ixz="0" iyy="0.000768905501178" iyz="0" izz="6.88531064581e-05"/>
</inertial>
</link>
<joint name="arm_joint_4" type="revolute">
<parent link="arm_link_3"/>
<child link="arm_link_4"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.377"/>
<axis xyz="0 1 0"/>
</joint>
<link name="arm_link_4">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
<material name="omni/Green"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<geometry>
<box size="0.0356 0.05 0.132"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.066"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_5" type="revolute">
<parent link="arm_link_4"/>
<child link="arm_link_5"/>
<dynamics damping="3.0" friction="0.3"/>
<limit effort="30.0" lower="-1.75079632679" upper="1.75079632679" velocity="5.0"/>
<origin rpy="0 0 0" xyz="0 0 0.132"/>
<axis xyz="1 0 0"/>
</joint>
<link name="arm_link_5">
<visual>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
<material name="omni/Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<geometry>
<box size="0.0356 0.05 0.248"/>
</geometry>
</collision>
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0.124"/>
<mass value="0.15813986"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="0.00037242266488" ixy="0" ixz="0" iyy="0.000356178538461" iyz="0" izz="4.96474819141e-05"/>
</inertial>
</link>
<joint name="arm_joint_6" type="fixed">
<parent link="arm_link_5"/>
<child link="arm_link_6"/>
<origin rpy="0 0 0" xyz="0 0 0.248"/>
</joint>
<link name="arm_link_6">
<inertial>
<!-- CENTER OF MASS -->
<origin rpy="0 0 0" xyz="0 0 0"/>
<mass value="1e-12"/>
<!-- box inertia: 1/12*m(y^2+z^2), ... -->
<inertia ixx="1e-12" ixy="0" ixz="0" iyy="1e-12" iyz="0" izz="1e-12"/>
</inertial>
</link>
<!-- END OF ARM LINKS/JOINTS -->
</robot>

View File

@@ -1,307 +0,0 @@
<robot name="robot">
<link name="base_footprint"></link>
<joint name="base_joint" type="fixed">
<parent link="base_footprint" />
<child link="base_link" />
<origin xyz="0 0 0.0004048057655643422" rpy="0 0 0" />
</joint>
<link name="base_link">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
<material name="base_link-material">
<color rgba="0 0.6038273388475408 1 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.5 0.5 0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.17" ixy="0" ixz="0" iyy="0.17" iyz="0" izz="0.05" />
</inertial>
</link>
<joint name="Axis_0_Joint" type="revolute">
<parent link="base_link" />
<child link="Axis_0" />
<origin xyz="0 0 0.035" rpy="0 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.5" upper="2.5" velocity="0.5"/> </joint>
<link name="Axis_0">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
<material name="Axis_0-material">
<color rgba="0.3515325994898463 0.4735314961384573 0.9301108583738498 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.15" length="0.059" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_Joint" type="revolute">
<parent link="Axis_0" />
<child link="Axis_1" />
<origin xyz="0 0 0.11189588647115647" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.7" upper="1.7" velocity="0.5"/> </joint>
<link name="Axis_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
<material name="Axis_1-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.082" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_1_to_Segment_1" type="fixed">
<parent link="Axis_1" />
<child link="Segment_1" />
<origin xyz="0 0.2350831500270899 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_1">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
<material name="Segment_1-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.473" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_Joint" type="revolute">
<parent link="Segment_1" />
<child link="Axis_2" />
<origin xyz="0 -5.219894517229704e-17 0.2637568842473722" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-2.7" upper="2.7" velocity="0.5"/> </joint>
<link name="Axis_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
<material name="Axis_2-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.055" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_2_to_Segment_2" type="fixed">
<parent link="Axis_2" />
<child link="Segment_2" />
<origin xyz="0 0.19535682173790003 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_2">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
<material name="Segment_2-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.393" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_Joint" type="revolute">
<parent link="Segment_2" />
<child link="Axis_3" />
<origin xyz="0 -4.337792830220178e-17 0.199625776257357" rpy="1.5707963267948963 0 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
<material name="Axis_3-material">
<color rgba="0.14702726648767014 0.14126329113044458 0.7304607400847158 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.05" length="0.1" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_3_to_Segment_3" type="fixed">
<parent link="Axis_3" />
<child link="Segment_3" />
<origin xyz="0 0.06725724726912972 0" rpy="-1.5707963267948963 0 0" />
</joint>
<link name="Segment_3">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
<material name="Segment_3-material">
<color rgba="0.09084171117479915 0.3231432091022285 0.1844749944900301 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.025" length="0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Wrist_Joint" type="revolute">
<parent link="Segment_3" />
<child link="Axis_4" />
<origin xyz="0 0 0.0655808825338593" rpy="0 1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-1.6" upper="1.6" velocity="0.5"/> </joint>
<link name="Axis_4">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
<material name="Axis_4-material">
<color rgba="0.23455058215026167 0.9301108583738498 0.21952619971859377 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.03" length="0.075" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Continuous_Joint" type="revolute">
<parent link="Axis_4" />
<child link="Axis_4_C" />
<origin xyz="0.0009533507860803557 0 0" rpy="0 -1.5707963267948966 0" />
<axis xyz="0 0 1"/>
<limit effort="1000.0" lower="-3.14" upper="3.14" velocity="0.5"/>
</joint>
<link name="Axis_4_C">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
<material name="Axis_4_C-material">
<color rgba="0.006048833020386069 0.407240211891531 0.15592646369776456 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<cylinder radius="0.02" length="0.01" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.3333333333333333" ixy="0" ixz="0" iyy="0.5" iyz="0" izz="0.3333333333333333" />
</inertial>
</link>
<joint name="Axis_4_C_to_Effector" type="fixed">
<parent link="Axis_4_C" />
<child link="Effector" />
<origin xyz="0 0 0.06478774571448076" rpy="0 0 0" />
</joint>
<link name="Effector">
<visual>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.05 0.01 0.135" />
</geometry>
<material name="Effector-material">
<color rgba="0.2746773120495699 0.01680737574872402 0.5711248294565854 1" />
</material>
</visual>
<collision>
<origin xyz="0 0 0" rpy="0 0 0" />
<geometry>
<box size="0.01 0.01 0.135" />
</geometry>
</collision>
<inertial>
<origin xyz="0 0 0" rpy="0 0 0" />
<mass value="1" />
<inertia ixx="0.16666666666666666" ixy="0" ixz="0" iyy="0.16666666666666666" iyz="0" izz="0.16666666666666666" />
</inertial>
</link>
</robot>

View File

@@ -3,7 +3,6 @@ from rclpy.node import Node
import serial
import sys
import threading
import os
import glob
import time
import atexit
@@ -203,11 +202,8 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -223,6 +219,6 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -1,6 +1,5 @@
import rclpy
from rclpy.node import Node
from rclpy import qos
from std_srvs.srv import Empty
import signal
@@ -8,7 +7,6 @@ import time
import atexit
import serial
import os
import sys
import threading
import glob
@@ -20,17 +18,6 @@ from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None
thread = None
# control_qos = qos.QoSProfile(
# history=qos.QoSHistoryPolicy.KEEP_LAST,
# depth=1,
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=1000,
# lifespan=500,
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=5000
# )
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
@@ -182,7 +169,7 @@ class SerialRelay(Node):
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg: CoreControl):
def send_controls(self, msg):
#can_relay_tovic,core,19, left_stick, right_stick
if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
@@ -195,14 +182,14 @@ class SerialRelay(Node):
self.send_cmd(command)
#print(f"[Sys] Relaying: {command}")
def send_cmd(self, msg: str):
def send_cmd(self, msg):
if self.launch_mode == 'anchor':
#self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String()#Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'core':
self.get_logger().info(f"[Core to MCU] {msg.data}")
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String):
@@ -218,43 +205,25 @@ class SerialRelay(Node):
#if parts length is at least 5 then we should have altitude, this is a temporary check until fully implemented
if len(parts) >= 5:
self.core_feedback.gps_alt = round(float(parts[4]), 2)
elif output.startswith("can_relay_fromvic,core,51"): #Gyro x,y,z
elif output.startswith("can_relay_fromvic,core,51"):#Gyro x,y,z
self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6]))
elif output.startswith("can_relay_fromvic,core,52"): #Accel x,y,z, heading
elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4])
self.core_feedback.bno_accel.z = float(parts[5])
self.core_feedback.orientation = float(parts[6])
elif output.startswith("can_relay_fromvic,core,53"): #Rev motor feedback
motorId = round(float(parts[3]))
temp = float(parts[4]) / 10.0
voltage = float(parts[5]) / 10.0
current = float(parts[6]) / 10.0
if motorId == 1:
self.core_feedback.fl_temp = temp
self.core_feedback.fl_voltage = voltage
self.core_feedback.fl_current = current
elif motorId == 2:
self.core_feedback.bl_temp = temp
self.core_feedback.bl_voltage = voltage
self.core_feedback.bl_current = current
elif motorId == 3:
self.core_feedback.fr_temp = temp
self.core_feedback.fr_voltage = voltage
self.core_feedback.fr_current = current
elif motorId == 4:
self.core_feedback.br_temp = temp
self.core_feedback.br_voltage = voltage
self.core_feedback.br_current = current
elif output.startswith("can_relay_fromvic,core,54"): #bat, 12, 5, 3, Voltage readings * 100
self.core_feedback.orientation = float(parts[6]) / 10.0
elif output.startswith("can_relay_fromvic,core,53"):#Rev motor feedback
pass
#self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,core,54"):#bat, 12, 5, 3, Voltage readings * 100
self.core_feedback.bat_voltage = float(parts[3]) / 100.0
self.core_feedback.voltage_12 = float(parts[4]) / 100.0
self.core_feedback.voltage_5 = float(parts[5]) / 100.0
self.core_feedback.voltage_3 = float(parts[6]) / 100.0
elif output.startswith("can_relay_fromvic,core,56"): #BMP Temp, Altitude, Pressure
elif output.startswith("can_relay_fromvic,core,56"):#BMP Temp, Altitude, Pressure
self.core_feedback.bmp_temp = float(parts[3])
self.core_feedback.bmp_alt = float(parts[4])
self.core_feedback.bmp_pres = float(parts[5])
@@ -276,11 +245,8 @@ class SerialRelay(Node):
def cleanup(self):
print("Cleaning up before terminating...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
@@ -299,7 +265,7 @@ def main(args=None):
serial_pub.run()
if __name__ == '__main__':
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -202,9 +202,14 @@ class PtzNode(Node):
await self.camera.send_single_axis_attitude_command(angle, axis)
elif msg.control_mode == 3:
zoom_level = msg.zoom_level
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x")
await self.camera.send_absolute_zoom_command(zoom_level)
zoom_direction = 0
if msg.zoom_level > 0:
zoom_direction = 1 # Zoom in
elif msg.zoom_level < 0:
zoom_direction = -1 # Zoom out
self.get_logger().debug(f"Attempting manual zoom: direction={zoom_direction}")
await self.camera.send_manual_zoom_command(zoom_direction)
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
if msg.stream_type > 0 and msg.stream_freq >= 0:

View File

@@ -64,6 +64,7 @@ class CommandID(Enum):
"""
ROTATION_CONTROL = 0x07
MANUAL_ZOOM = 0x05 # Added manual zoom command
ABSOLUTE_ZOOM = 0x08
ATTITUDE_ANGLES = 0x0E
SINGLE_AXIS_CONTROL = 0x41
@@ -320,6 +321,47 @@ class SiyiGimbalCamera:
f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)"
)
def _build_manual_zoom_packet(self, zoom_direction: int) -> bytes:
"""
Helper to build a manual zoom command packet.
:param zoom_direction: Direction to zoom: 1 for in, -1 for out, 0 for stop.
:return: Complete packet bytes ready to send.
"""
data_len = 1
packet = self._build_packet_header(CommandID.MANUAL_ZOOM, data_len)
# Convert the direction to a valid byte value:
# -1 (zoom out) -> 2
# 0 (stop zoom) -> 0
# 1 (zoom in) -> 1
if zoom_direction < 0:
direction_byte = 2 # Zoom out
elif zoom_direction > 0:
direction_byte = 1 # Zoom in
else:
direction_byte = 0 # Stop zoom
logger.debug(f"Manual zoom: input direction={zoom_direction}, mapped to byte={direction_byte}")
packet.append(direction_byte)
return self._finalize_packet(packet)
async def send_manual_zoom_command(self, zoom_direction: int) -> None:
"""
Send a manual zoom command (0x05) to the gimbal.
:param zoom_direction: 1 to zoom in, -1 to zoom out, 0 to stop zooming.
"""
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send manual zoom command."
)
packet = self._build_manual_zoom_packet(zoom_direction)
logger.debug(f"Manual zoom packet: {packet.hex()}")
self.writer.write(packet)
await self.writer.drain()
logger.debug(f"Sent manual zoom command with direction {zoom_direction}")
async def _read_packet(self):
if not self.reader:
raise RuntimeError("Reader is not initialized.")