Merge pull request #11 from SHC-ASTRA/ptz-node

Ptz node
This commit is contained in:
Tristan McGinnis
2025-05-27 16:50:27 -05:00
committed by GitHub
13 changed files with 1219 additions and 42 deletions

View File

@@ -38,6 +38,16 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
nodes.append(
Node(
package='core_pkg',
executable='ptz', # change as needed
name='ptz',
output='both'
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
)
)
nodes.append( nodes.append(
Node( Node(
package='bio_pkg', package='bio_pkg',
@@ -58,7 +68,7 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
elif mode in ['arm', 'core', 'bio']: elif mode in ['arm', 'core', 'bio', 'ptz']:
# Only launch the node corresponding to the provided mode. # Only launch the node corresponding to the provided mode.
if mode == 'arm': if mode == 'arm':
nodes.append( nodes.append(
@@ -93,10 +103,19 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
elif mode == 'ptz':
nodes.append(
Node(
package='core_pkg',
executable='ptz',
name='ptz',
output='both',
on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched
)
)
else: else:
# If an invalid mode is provided, print an error. # If an invalid mode is provided, print an error.
# (You might want to raise an exception or handle it differently.) print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.")
print("Invalid mode provided. Choose one of: arm, core, bio, anchor.")
return nodes return nodes
@@ -104,7 +123,7 @@ def generate_launch_description():
declare_arg = DeclareLaunchArgument( declare_arg = DeclareLaunchArgument(
'mode', 'mode',
default_value='anchor', default_value='anchor',
description='Launch mode: arm, core, bio, or anchor' description='Launch mode: arm, core, bio, anchor, or ptz'
) )
return LaunchDescription([ return LaunchDescription([

View File

@@ -90,9 +90,9 @@ class SerialRelay(Node):
msg.data = output msg.data = output
if output.startswith("can_relay_fromvic,core"): if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg) self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm"): elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
self.arm_pub.publish(msg) self.arm_pub.publish(msg)
elif output.startswith("can_relay_fromvic,bio"): elif output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
self.bio_pub.publish(msg) self.bio_pub.publish(msg)
# msg = String() # msg = String()
# msg.data = output # msg.data = output
@@ -138,13 +138,13 @@ def myexcepthook(type, value, tb):
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook sys.excepthook = myexcepthook
global serial_pub global serial_pub
serial_pub = SerialRelay() serial_pub = SerialRelay()
serial_pub.run() serial_pub.run()
if __name__ == '__main__': if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly

View File

@@ -11,6 +11,7 @@ from std_msgs.msg import String
from ros2_interfaces_pkg.msg import ArmManual from ros2_interfaces_pkg.msg import ArmManual
from ros2_interfaces_pkg.msg import ArmIK from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback from ros2_interfaces_pkg.msg import SocketFeedback
from ros2_interfaces_pkg.msg import DigitFeedback
serial_pub = None serial_pub = None
thread = None thread = None
@@ -28,6 +29,7 @@ class SerialRelay(Node):
# Create publishers # Create publishers
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
self.feedback_timer = self.create_timer(1.0, self.publish_feedback) self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
# Create subscribers # Create subscribers
@@ -40,6 +42,7 @@ class SerialRelay(Node):
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.arm_feedback = SocketFeedback() self.arm_feedback = SocketFeedback()
self.digit_feedback = DigitFeedback()
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == 'arm': if self.launch_mode == 'arm':
@@ -182,16 +185,30 @@ class SerialRelay(Node):
elif output.startswith("can_relay_fromvic,arm,53"): elif output.startswith("can_relay_fromvic,arm,53"):
#pass #pass
self.updateMotorFeedback(output) self.updateMotorFeedback(output)
elif output.startswith("can_relay_fromvic,digit,54"):
parts = msg.data.split(",")
if len(parts) >= 7:
# Extract the voltage from the string
voltages_in = parts[3:7]
# Convert the voltages to floats
self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0
self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0
self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0
elif output.startswith("can_relay_fromvic,digit,55"):
parts = msg.data.split(",")
if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3])
else: else:
return return
def publish_feedback(self): def publish_feedback(self):
self.socket_pub.publish(self.arm_feedback) self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg): def updateAngleFeedback(self, msg: String):
# Angle feedbacks, # Angle feedbacks,
#split the msg.data by commas #split the msg.data by commas
parts = msg.split(",") parts = msg.data.split(",")
if len(parts) >= 7: if len(parts) >= 7:
# Extract the angles from the string # Extract the angles from the string
@@ -224,9 +241,9 @@ class SerialRelay(Node):
self.get_logger().info("Invalid angle feedback input format") self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg): def updateBusVoltage(self, msg: String):
# Bus Voltage feedbacks # Bus Voltage feedbacks
parts = msg.split(",") parts = msg.data.split(",")
if len(parts) >= 7: if len(parts) >= 7:
# Extract the voltage from the string # Extract the voltage from the string
voltages_in = parts[3:7] voltages_in = parts[3:7]
@@ -242,7 +259,7 @@ class SerialRelay(Node):
def updateMotorFeedback(self, msg): def updateMotorFeedback(self, msg):
# Motor voltage/current/temperature feedback # Motor voltage/current/temperature feedback
return return
# parts = msg.split(",") # parts = msg.data.split(",")
# if len(parts) >= 7: # if len(parts) >= 7:
# # Extract the voltage/current/temperature from the string # # Extract the voltage/current/temperature from the string
# values_in = parts[3:7] # values_in = parts[3:7]

View File

@@ -9,6 +9,7 @@ import atexit
import signal import signal
from std_msgs.msg import String from std_msgs.msg import String
from ros2_interfaces_pkg.msg import BioControl from ros2_interfaces_pkg.msg import BioControl
from ros2_interfaces_pkg.msg import BioFeedback
serial_pub = None serial_pub = None
thread = None thread = None
@@ -25,17 +26,22 @@ class SerialRelay(Node):
# Create publishers # Create publishers
self.debug_pub = self.create_publisher(String, '/bio/feedback/debug', 10) self.debug_pub = self.create_publisher(String, '/bio/feedback/debug', 10)
#self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) self.feedback_pub = self.create_publisher(BioFeedback, '/bio/feedback', 10)
# Create subscribers\ # Create subscribers
self.control_sub = self.create_subscription(BioControl, '/bio/control', self.send_control, 10) self.control_sub = self.create_subscription(BioControl, '/bio/control', self.send_control, 10)
# Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
# Topics used in anchor mode # Topics used in anchor mode
if self.launch_mode == 'anchor': if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/bio/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(String, '/anchor/bio/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
self.bio_feedback = BioFeedback()
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == 'bio': if self.launch_mode == 'bio':
@@ -117,7 +123,7 @@ class SerialRelay(Node):
pass pass
def send_control(self, msg): def send_control(self, msg: BioControl):
# CITADEL Control Commands # CITADEL Control Commands
################ ################
@@ -132,12 +138,12 @@ class SerialRelay(Node):
self.send_cmd(command) self.send_cmd(command)
# Servos, only send if not zero # Servos, only send if not zero
if msg.servo_id != 0: if msg.servo_id != 0:
command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(msg.servo_position) + "\n" command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(int(msg.servo_state)) + "\n"
self.send_cmd(command) self.send_cmd(command)
# LSS # LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.lss_direction) + "\n" command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
self.send_cmd(command) self.send_cmd(command)
# Vibration Motor # Vibration Motor
command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n" command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
@@ -150,21 +156,21 @@ class SerialRelay(Node):
# To be reviewed before use# # To be reviewed before use#
# Laser # Laser
command = "can_relay_tovic,faerie,28," + str(msg.laser) + "\n" command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command) self.send_cmd(command)
# # UV Light # Drill (SCABBARD)
# command = "can_relay_tovic,faerie,38," + str(msg.uvLight) + "\n" command = f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command) print(msg.drill)
self.send_cmd(command)
# Drill # Bio linear actuator
command = f"can_relay_tovic,faerie,19,{msg.drill_duty:.2f}\n" command = "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
print(msg.drill_duty)
self.send_cmd(command) self.send_cmd(command)
def send_cmd(self, msg): def send_cmd(self, msg: str):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
output = String() output = String()
output.data = msg output.data = msg
@@ -173,10 +179,21 @@ class SerialRelay(Node):
self.get_logger().info(f"[Bio to MCU] {msg}") self.get_logger().info(f"[Bio to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg): def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
self.get_logger().info(f"[Bio Anchor] {msg.data}") self.get_logger().info(f"[Bio Anchor] {msg.data}")
#self.send_cmd(msg.data)
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0
elif output.startswith("can_relay_fromvic,digit,57"):
self.bio_feedback.drill_temp = float(parts[3])
self.bio_feedback.drill_humidity = float(parts[4])
def publish_feedback(self):
self.feedback_pub.publish(self.bio_feedback)
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():

View File

@@ -152,7 +152,7 @@ class SerialRelay(Node):
self.ser.close() self.ser.close()
self.exit(1) self.exit(1)
def scale_duty(self, value, max_speed): def scale_duty(self, value: float, max_speed: float):
leftMin = -1 leftMin = -1
leftMax = 1 leftMax = 1
rightMin = -max_speed/100.0 rightMin = -max_speed/100.0
@@ -172,10 +172,13 @@ class SerialRelay(Node):
def send_controls(self, msg): def send_controls(self, msg):
#can_relay_tovic,core,19, left_stick, right_stick #can_relay_tovic,core,19, left_stick, right_stick
if(msg.turn_to_enable): if(msg.turn_to_enable):
command = "can_relay_tovic,core,41," + msg.turn_to + ',' + msg.turn_to_timeout + '\n' command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
else: else:
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n' command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
self.send_cmd(command)
# Brake mode
command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n'
self.send_cmd(command) self.send_cmd(command)
#print(f"[Sys] Relaying: {command}") #print(f"[Sys] Relaying: {command}")
@@ -189,7 +192,7 @@ class SerialRelay(Node):
self.get_logger().info(f"[Core to MCU] {msg}") self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
#self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}") #self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}")
@@ -203,6 +206,7 @@ class SerialRelay(Node):
self.core_feedback.bno_gyro.x = float(parts[3]) self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4]) self.core_feedback.bno_gyro.y = float(parts[4])
self.core_feedback.bno_gyro.z = float(parts[5]) self.core_feedback.bno_gyro.z = float(parts[5])
self.core_feedback.imu_calib = round(float(parts[6]))
elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10 elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
self.core_feedback.bno_accel.x = float(parts[3]) self.core_feedback.bno_accel.x = float(parts[3])
self.core_feedback.bno_accel.y = float(parts[4]) self.core_feedback.bno_accel.y = float(parts[4])

View File

@@ -0,0 +1,355 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import asyncio
from concurrent.futures import ThreadPoolExecutor
import signal
import sys
import threading
import time
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import PtzControl, PtzFeedback
# Import the SIYI SDK
from core_pkg.siyi_sdk import (
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
AttitudeData
)
class PtzNode(Node):
def __init__(self):
# Initialize node with name
super().__init__("core_ptz")
# Declare parameters
self.declare_parameter('camera_ip', '192.168.1.9')
self.declare_parameter('camera_port', 37260)
# Get parameters
self.camera_ip = self.get_parameter('camera_ip').value
self.camera_port = self.get_parameter('camera_port').value
self.get_logger().info(f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}")
# Create a camera instance
self.camera = None
self.camera_connected = False # This flag is still managed but not used to gate commands
self.loop = None
self.thread_pool = None
# Create publishers
self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10)
self.debug_pub = self.create_publisher(String, '/ptz/debug', 10)
# Create subscribers
self.control_sub = self.create_subscription(
PtzControl, '/ptz/control', self.handle_control_command, 10)
# Create timers
self.connection_timer = self.create_timer(5.0, self.check_camera_connection)
self.last_data_time = time.time()
self.health_check_timer = self.create_timer(2.0, self.check_camera_health)
# Create feedback message
self.feedback_msg = PtzFeedback()
self.feedback_msg.connected = False # This will reflect the actual connection state
self.feedback_msg.error_msg = "Initializing"
# Flags for async operations
self.shutdown_requested = False
# Set up asyncio event loop in a separate thread
self.thread_pool = ThreadPoolExecutor(max_workers=1)
self.loop = asyncio.new_event_loop()
# Connect to camera on startup
self.connect_task = self.thread_pool.submit(
self.run_async_func, self.connect_to_camera()
)
async def connect_to_camera(self):
"""Connect to the SIYI camera."""
try:
# Create a new camera instance
self.camera = SiyiGimbalCamera(ip=self.camera_ip, port=self.camera_port)
# Connect to the camera
await self.camera.connect()
# Set up data callback
self.camera.set_data_callback(self.camera_data_callback)
# Request attitude data stream
await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA,
DataStreamFrequency.HZ_10
)
# Update connection status
self.camera_connected = True
self.feedback_msg.connected = True
self.feedback_msg.error_msg = ""
self.publish_debug("Camera connected successfully")
except Exception as e:
self.camera_connected = False
self.feedback_msg.connected = False
self.feedback_msg.error_msg = f"Connection error: {str(e)}"
self.publish_debug(f"Camera connection failed: {str(e)}")
def camera_data_callback(self, cmd_id, data):
"""Handle data received from the camera."""
# Update last_data_time regardless of self.camera_connected,
# as data might arrive during a brief reconnect window.
self.last_data_time = time.time()
if self.camera_connected: # Only process for feedback if we believe we are connected
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData):
self.feedback_msg.yaw = data.yaw
self.feedback_msg.pitch = data.pitch
self.feedback_msg.roll = data.roll
self.feedback_msg.yaw_velocity = data.yaw_velocity
self.feedback_msg.pitch_velocity = data.pitch_velocity
self.feedback_msg.roll_velocity = data.roll_velocity
self.feedback_pub.publish(self.feedback_msg)
else:
debug_str = ""
if isinstance(cmd_id, CommandID):
debug_str = f"Camera data: CMD_ID={cmd_id.name}, Data="
else:
debug_str = f"Camera data: CMD_ID={cmd_id}, Data="
if isinstance(data, bytes):
debug_str += data.hex()
else:
debug_str += str(data)
self.get_logger().debug(debug_str)
def check_camera_connection(self):
"""Periodically check camera connection and attempt to reconnect if needed."""
if not self.camera_connected and not self.shutdown_requested:
self.publish_debug("Attempting to reconnect to camera...")
if self.camera:
try:
if self.camera.is_connected: # SDK's internal connection state
self.run_async_func(self.camera.disconnect())
except Exception as e:
self.get_logger().debug(f"Error during pre-reconnect disconnect: {e}")
# self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new
self.connect_task = self.thread_pool.submit(
self.run_async_func, self.connect_to_camera()
)
def check_camera_health(self):
"""Check if we're still receiving data from the camera"""
if self.camera_connected: # Only check health if we think we are connected
time_since_last_data = time.time() - self.last_data_time
if time_since_last_data > 5.0:
self.publish_debug(f"No camera data for {time_since_last_data:.1f}s, marking as disconnected.")
self.camera_connected = False
self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Connection stale (no data)"
self.feedback_pub.publish(self.feedback_msg)
def handle_control_command(self, msg):
"""Handle incoming control commands."""
# Removed: if not self.camera_connected
if not self.camera: # Still check if camera object exists
self.get_logger().warning("Camera object not initialized, ignoring control command")
return
self.thread_pool.submit(
self.run_async_func,
self.process_control_command(msg)
)
async def process_control_command(self, msg):
"""Process and send the control command to the camera."""
if not self.camera:
self.get_logger().error("Process control command called but camera object is None.")
return
try:
# The SDK's send_... methods will raise RuntimeError if not connected.
# This try-except block will catch those.
if msg.reset:
self.get_logger().info("Attempting to reset camera to center position")
await self.camera.send_attitude_angles_command(0.0, 0.0)
return
if msg.control_mode == 0:
turn_yaw = max(-100, min(100, int(msg.turn_yaw)))
turn_pitch = max(-100, min(100, int(msg.turn_pitch)))
self.get_logger().debug(f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}")
await self.camera.send_rotation_command(turn_yaw, turn_pitch)
elif msg.control_mode == 1:
yaw = max(-135.0, min(135.0, msg.yaw))
pitch = max(-90.0, min(90.0, msg.pitch))
self.get_logger().debug(f"Attempting absolute angles: yaw={yaw}, pitch={pitch}")
await self.camera.send_attitude_angles_command(yaw, pitch)
elif msg.control_mode == 2:
axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH
angle = msg.angle
self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}")
await self.camera.send_single_axis_attitude_command(angle, axis)
elif msg.control_mode == 3:
zoom_level = msg.zoom_level
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x")
await self.camera.send_absolute_zoom_command(zoom_level)
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
if msg.stream_type > 0 and msg.stream_freq >= 0:
try:
stream_type = DataStreamType(msg.stream_type)
stream_freq = DataStreamFrequency(msg.stream_freq)
self.get_logger().info(
f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}"
)
await self.camera.send_data_stream_request(stream_type, stream_freq)
except ValueError:
self.get_logger().error("Invalid stream type or frequency values in control message")
except RuntimeError as e: # Catch SDK's "not connected" errors
self.get_logger().warning(f"SDK command failed (likely not connected): {e}")
# self.camera_connected will be updated by health/connection checks
# self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check
# self.feedback_pub.publish(self.feedback_msg)
except Exception as e:
self.get_logger().error(f"Error processing control command: {e}")
self.feedback_msg.error_msg = f"Control error: {str(e)}"
self.feedback_pub.publish(self.feedback_msg) # Publish for other errors
def publish_debug(self, message_text):
"""Publish debug message."""
msg = String()
msg.data = f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
self.debug_pub.publish(msg)
self.get_logger().info(message_text)
def run_async_func(self, coro):
"""Run an async function in the event loop."""
if self.loop and self.loop.is_running():
try:
return asyncio.run_coroutine_threadsafe(coro, self.loop).result(timeout=5.0) # Added timeout
except asyncio.TimeoutError:
self.get_logger().warning(f"Async function {coro.__name__} timed out.")
return None
except Exception as e:
self.get_logger().error(f"Exception in run_async_func for {coro.__name__}: {e}")
return None
else:
self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.")
return None
async def shutdown_node_async(self):
"""Perform clean shutdown of camera connection."""
self.shutdown_requested = True
self.get_logger().info("Async shutdown initiated...")
if self.camera and self.camera.is_connected: # Check SDK's connection state
try:
self.get_logger().info("Disabling data stream...")
await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA,
DataStreamFrequency.DISABLE
)
await asyncio.sleep(0.1)
self.get_logger().info("Disconnecting from camera...")
await self.camera.disconnect()
self.get_logger().info("Disconnected from camera successfully.")
except Exception as e:
self.get_logger().error(f"Error during camera shutdown: {e}")
self.camera_connected = False # Update node's flag
self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Shutting down"
def cleanup(self):
"""Clean up resources."""
self.get_logger().info("PTZ node cleanup initiated.")
self.shutdown_requested = True
if self.connection_timer:
self.connection_timer.cancel()
if self.health_check_timer:
self.health_check_timer.cancel()
if self.loop and self.thread_pool:
if self.loop.is_running():
try:
future = asyncio.run_coroutine_threadsafe(self.shutdown_node_async(), self.loop)
future.result(timeout=5)
except Exception as e:
self.get_logger().error(f"Error during async shutdown in cleanup: {e}")
self.get_logger().info("Shutting down thread pool executor...")
self.thread_pool.shutdown(wait=True)
if self.loop.is_running():
self.get_logger().info("Stopping asyncio event loop...")
self.loop.call_soon_threadsafe(self.loop.stop)
self.get_logger().info("PTZ node resources cleaned up.")
else:
self.get_logger().warning("Loop or thread_pool not initialized, skipping parts of cleanup.")
def main(args=None):
"""Main function."""
rclpy.init(args=args)
ptz_node = PtzNode()
asyncio_thread = None
if ptz_node.loop:
def run_event_loop(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
# This ensures the loop is closed when the thread running it exits.
# This is important if run_forever() exits due to loop.stop()
# or an unhandled exception within a task scheduled on the loop.
if not loop.is_closed():
loop.close()
asyncio_thread = threading.Thread(
target=run_event_loop,
args=(ptz_node.loop,),
daemon=True
)
asyncio_thread.start()
try:
rclpy.spin(ptz_node)
except KeyboardInterrupt:
ptz_node.get_logger().info("KeyboardInterrupt received, shutting down...")
except SystemExit:
ptz_node.get_logger().info("SystemExit received, shutting down...")
finally:
ptz_node.get_logger().info("Initiating final cleanup...")
ptz_node.cleanup() # This will stop the loop and shutdown the executor
if asyncio_thread and asyncio_thread.is_alive():
# The loop should have been stopped by cleanup. We just join the thread.
ptz_node.get_logger().info("Waiting for asyncio thread to join...")
asyncio_thread.join(timeout=5)
if asyncio_thread.is_alive():
ptz_node.get_logger().warning("Asyncio thread did not join cleanly.")
rclpy.shutdown()
ptz_node.get_logger().info("ROS shutdown complete.")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,174 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

View File

@@ -0,0 +1,26 @@
# SIYI Gimbal Camera Python Library
This library provides a Python interface for controlling SIYI gimbal cameras over TCP. It allows you to:
- Connect to the camera and maintain a stable connection.
- Control the camera's movement (pan, tilt).
- Request a continuous stream of data (attitude, etc.).
> [!NOTE]
> The code for this library (including this README) was written almost entirely by an AI assistant, with
> the assistance and direction of a human developer. It probably sucks, but I need sleep, so I'm not going
> to redo it. Sorry.
## Core Features
- **Asynchronous communication:** Utilizes `asyncio` for non-blocking operation.
- **Command handling:** Easy-to-use functions for sending control commands.
- **Data streaming:** Ability to receive continuous data streams for real-time monitoring.
## Key Components
- `SiyiGimbalCamera` class: Main class for interfacing with the camera.
- Enums: `DataStreamType`, `DataStreamFrequency`, `SingleAxis`, `CommandID` to manage protocol values.
- Logging: Uses the Python `logging` library.
Refer to the code comments and docstrings for specific function details.

View File

@@ -0,0 +1 @@
from .siyi import *

View File

@@ -0,0 +1,502 @@
import asyncio
import socket
import struct
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
from crccheck.crc import Crc16
# Set up logging
logger = logging.getLogger(__name__)
# Global constant for the heartbeat packet (used in TCP keep-alive)
HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B")
class DataStreamType(Enum):
"""
Enumeration for data stream types in the 0x25 command.
Allows users to select which type of data to stream from the gimbal camera.
"""
ATTITUDE_DATA = 1
LASER_RANGE_DATA = 2
MAGNETIC_ENCODER_ANGLE_DATA = 3
MOTOR_VOLTAGE_DATA = 4
class DataStreamFrequency(Enum):
"""
Enumeration for data stream frequencies in the 0x25 command.
Allows the user to select the output (streaming) frequency.
"""
DISABLE = 0
HZ_2 = 1
HZ_4 = 2
HZ_5 = 3
HZ_10 = 4
HZ_20 = 5
HZ_50 = 6
HZ_100 = 7
class SingleAxis(Enum):
"""
Enumeration for specifying the controlled axis in the 0x41 single-axis control command.
YAW corresponds to 0 and PITCH corresponds to 1.
"""
YAW = 0
PITCH = 1
class CommandID(Enum):
"""
Enumeration for command identifiers used in the protocol.
These identifiers are sent as the CMD_ID in a packet and help in matching
responses or processing incoming packets.
"""
ROTATION_CONTROL = 0x07
ABSOLUTE_ZOOM = 0x08
ATTITUDE_ANGLES = 0x0E
SINGLE_AXIS_CONTROL = 0x41
DATA_STREAM_REQUEST = 0x25
ATTITUDE_DATA_RESPONSE = 0x0D
@dataclass
class AttitudeData:
"""
Class representing the parsed attitude data from the gimbal.
Angles (yaw, pitch, roll) are in degrees and are scaled using one decimal point precision.
Angular velocities are raw int16 values.
"""
yaw: float
pitch: float
roll: float
yaw_velocity: int
pitch_velocity: int
roll_velocity: int
@classmethod
def from_bytes(cls, data: bytes) -> "AttitudeData":
"""Create an AttitudeData instance from a byte string."""
if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.")
values = struct.unpack("<hhhhhh", data)
return cls(
yaw=values[0] / 10.0,
pitch=values[1] / 10.0,
roll=values[2] / 10.0,
yaw_velocity=values[3],
pitch_velocity=values[4],
roll_velocity=values[5],
)
class SiyiGimbalCamera:
"""
Class to interface with the SIYI Gimbal Camera over TCP.
"""
MAX_A8_MINI_ZOOM = 6.0 # Maximum zoom for A8 mini
def __init__(
self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2
):
self.ip = ip
self.port = port
self.heartbeat_interval = heartbeat_interval
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
self.is_connected = False
self.seq: int = 0
self._data_callback: Optional[Callable[[CommandID | int, Any], None]] = None
async def connect(self) -> None:
try:
self.reader, self.writer = await asyncio.open_connection(
self.ip, self.port
)
self.is_connected = True
asyncio.create_task(self.heartbeat_loop())
asyncio.create_task(self._data_stream_listener())
except (socket.gaierror, ConnectionRefusedError) as e:
self.is_connected = False
logger.error(f"Could not connect: {e}")
raise
async def disconnect(self) -> None:
if self.is_connected and self.writer:
self.is_connected = False
self.writer.close()
await self.writer.wait_closed()
else:
pass
async def heartbeat_loop(self) -> None:
if not self.is_connected or not self.writer:
return
try:
while self.is_connected:
try:
self.writer.write(HEARTBEAT_PACKET)
await self.writer.drain()
await asyncio.sleep(self.heartbeat_interval)
except (socket.error, BrokenPipeError) as e:
break
finally:
if self.is_connected:
await self.disconnect()
def _build_packet_header(
self, cmd_id: CommandID, data_len: int
) -> bytearray:
"""Helper to build the common packet header."""
packet = bytearray()
packet.extend(b"\x55\x66") # STX
packet.append(0x01) # CTRL (request ACK)
packet.extend(struct.pack("<H", data_len))
packet.extend(struct.pack("<H", self.seq))
packet.append(cmd_id.value)
return packet
def _finalize_packet(self, packet: bytearray) -> bytes:
"""Helper to add CRC and increment sequence number."""
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes:
data_len = 2
packet = self._build_packet_header(
CommandID.ROTATION_CONTROL, data_len
)
packet.extend(struct.pack("bb", turn_yaw, turn_pitch))
return self._finalize_packet(packet)
async def send_rotation_command(
self, turn_yaw: int, turn_pitch: int
) -> None:
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send rotation command."
)
packet = self._build_rotation_packet(turn_yaw, turn_pitch)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}"
)
def _build_attitude_angles_packet(
self, yaw: float, pitch: float
) -> bytes:
data_len = 4
packet = self._build_packet_header(
CommandID.ATTITUDE_ANGLES, data_len
)
yaw_int = int(round(yaw * 10))
pitch_int = int(round(pitch * 10))
packet.extend(struct.pack("<hh", yaw_int, pitch_int))
return self._finalize_packet(packet)
async def send_attitude_angles_command(
self, yaw: float, pitch: float
) -> None:
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send attitude angles command."
)
packet = self._build_attitude_angles_packet(yaw, pitch)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°"
)
def _build_single_axis_attitude_packet(
self, angle: float, axis: SingleAxis
) -> bytes:
data_len = 3
packet = self._build_packet_header(
CommandID.SINGLE_AXIS_CONTROL, data_len
)
angle_int = int(round(angle * 10))
packet.extend(struct.pack("<hB", angle_int, axis.value))
return self._finalize_packet(packet)
async def send_single_axis_attitude_command(
self, angle: float, axis: SingleAxis
) -> None:
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send single-axis attitude command."
)
packet = self._build_single_axis_attitude_packet(angle, axis)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°"
)
def _build_data_stream_packet(
self, data_type: DataStreamType, data_freq: DataStreamFrequency
) -> bytes:
data_len = 2
packet = self._build_packet_header(
CommandID.DATA_STREAM_REQUEST, data_len
)
packet.append(data_type.value)
packet.append(data_freq.value)
return self._finalize_packet(packet)
async def send_data_stream_request(
self, data_type: DataStreamType, data_freq: DataStreamFrequency
) -> None:
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot request data stream."
)
packet = self._build_data_stream_packet(data_type, data_freq)
self.writer.write(packet)
await self.writer.drain()
logger.info(
f"Sent data stream request: data_type {data_type.name}, data_freq {data_freq.name}"
)
def _build_absolute_zoom_packet(self, zoom_level: float) -> bytes:
data_len = 2
packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len)
zoom_packet_value = int(round(zoom_level * 10))
if not (0 <= zoom_packet_value <= 65535): # Should be caught by clamping earlier
raise ValueError(
"Zoom packet value out of uint16_t range after conversion."
)
packet.extend(struct.pack("<H", zoom_packet_value))
return self._finalize_packet(packet)
async def send_absolute_zoom_command(self, zoom_level: float) -> None:
"""
Send an absolute zoom command (0x08) to the gimbal.
:param zoom_level: The desired zoom level as a float (e.g., 1.0 for 1x,
2.5 for 2.5x). For A8 mini, this will be clamped
between 1.0 and MAX_A8_MINI_ZOOM (6.0).
"""
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send absolute zoom command."
)
original_requested_zoom = zoom_level # For logging clarity
if zoom_level < 1.0:
logger.warning(
f"Requested zoom level {original_requested_zoom:.1f} is less than 1.0. Clamping to 1.0x."
)
zoom_level = 1.0
elif zoom_level > self.MAX_A8_MINI_ZOOM: # Check against max zoom
logger.warning(
f"Requested zoom level {original_requested_zoom:.1f} exceeds maximum {self.MAX_A8_MINI_ZOOM:.1f}x for A8 mini. "
f"Clamping to {self.MAX_A8_MINI_ZOOM:.1f}x."
)
zoom_level = self.MAX_A8_MINI_ZOOM
packet = self._build_absolute_zoom_packet(zoom_level)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)"
)
async def _read_packet(self):
if not self.reader:
raise RuntimeError("Reader is not initialized.")
stx = await self.reader.readexactly(2)
if stx != b"\x55\x66":
raise ValueError(f"Invalid packet start bytes: {stx.hex()}")
ctrl = await self.reader.readexactly(1)
data_len_bytes = await self.reader.readexactly(2)
data_len = struct.unpack("<H", data_len_bytes)[0]
seq_bytes = await self.reader.readexactly(2) # Renamed for clarity
# seq_val = struct.unpack("<H", seq_bytes)[0] # If you need the sequence value
cmd_id_bytes = await self.reader.readexactly(1)
cmd_id_val = cmd_id_bytes[0] # Renamed for clarity
# Protect against excessively large data_len
if data_len > 2048: # Arbitrary reasonable limit
raise ValueError(f"Excessive data length received: {data_len}")
data = await self.reader.readexactly(data_len)
crc_bytes = await self.reader.readexactly(2)
received_crc = struct.unpack("<H", crc_bytes)[0]
packet_without_crc = (
stx + ctrl + data_len_bytes + seq_bytes + cmd_id_bytes + data
)
computed_crc = Crc16.calc(packet_without_crc)
if computed_crc != received_crc:
raise ValueError(
f"CRC check failed. Expected {computed_crc:04X}, got {received_crc:04X}. "
f"Packet: {packet_without_crc.hex()}"
)
return cmd_id_val, data
@staticmethod
def parse_attitude_data(data: bytes) -> AttitudeData:
if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.")
return AttitudeData.from_bytes(data)
async def _data_stream_listener(self) -> None:
while self.is_connected:
try:
cmd_id_int, data = await self._read_packet()
try:
cmd_id_enum = CommandID(cmd_id_int)
except ValueError:
logger.warning(
f"Received unknown CMD_ID {cmd_id_int:02X}, data: {data.hex()}"
)
if self._data_callback:
self._data_callback(cmd_id_int, data)
continue
if (
cmd_id_enum == CommandID.ATTITUDE_DATA_RESPONSE
and len(data) == 12
):
try:
parsed = AttitudeData.from_bytes(data)
if self._data_callback:
self._data_callback(cmd_id_enum, parsed)
else:
logger.info(f"Received attitude data: {parsed}")
except Exception as e:
logger.exception(
f"Failed to parse attitude data: {e}"
)
if self._data_callback:
self._data_callback(cmd_id_enum, data)
else:
if self._data_callback:
self._data_callback(cmd_id_enum, data)
else:
logger.info(
f"Received packet with CMD_ID {cmd_id_enum.name} ({cmd_id_enum.value:02X}): {data.hex()}"
)
except (
ConnectionResetError,
BrokenPipeError,
ConnectionError,
asyncio.IncompleteReadError,
) as e:
logger.error(f"Connection error in listener: {e}")
self.is_connected = False
break
except ValueError as e:
logger.error(f"Packet error in listener: {e}")
# Consider adding a small delay or a mechanism to resync if this happens frequently
await asyncio.sleep(0.1) # Small delay before trying to read again
continue
except Exception as e:
logger.exception(f"Unexpected error in data stream listener: {e}")
# Depending on the error, you might want to break or continue
await asyncio.sleep(0.1) # Small delay
continue
def set_data_callback(
self, callback: Callable[[CommandID | int, Any], None]
) -> None:
self._data_callback = callback
async def main_sdk_test(): # Renamed to avoid conflict if this file is imported
gimbal_ip = "192.168.144.25"
gimbal = SiyiGimbalCamera(gimbal_ip)
def my_data_handler(cmd_id, data):
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData):
print(
f"Attitude: Yaw={data.yaw:.1f}, Pitch={data.pitch:.1f}, Roll={data.roll:.1f}"
)
elif isinstance(cmd_id, CommandID):
print(
f"Received {cmd_id.name}: {data.hex() if isinstance(data, bytes) else data}"
)
else:
print(
f"Received Unknown CMD_ID {cmd_id:02X}: {data.hex() if isinstance(data, bytes) else data}"
)
gimbal.set_data_callback(my_data_handler)
try:
await gimbal.connect()
if gimbal.is_connected:
print("SDK Test: Successfully connected to the gimbal.")
print("SDK Test: Setting zoom to 1.0x")
await gimbal.send_absolute_zoom_command(1.0)
await asyncio.sleep(2)
print("SDK Test: Setting zoom to 3.5x")
await gimbal.send_absolute_zoom_command(3.5)
await asyncio.sleep(2)
print("SDK Test: Setting zoom to 6.0x (A8 mini max)")
await gimbal.send_absolute_zoom_command(6.0)
await asyncio.sleep(2)
print("SDK Test: Attempting zoom to 7.0x (should be clamped to 6.0x)")
await gimbal.send_absolute_zoom_command(7.0)
await asyncio.sleep(2)
print("SDK Test: Attempting zoom to 0.5x (should be clamped to 1.0x)")
await gimbal.send_absolute_zoom_command(0.5)
await asyncio.sleep(2)
print("SDK Test: Requesting attitude data stream at 5Hz...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_5)
print("SDK Test: Listening for data for 10 seconds...")
await asyncio.sleep(10)
print("SDK Test: Disabling attitude data stream...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE)
await asyncio.sleep(1)
except ConnectionRefusedError:
print(
f"SDK Test: Connection to {gimbal_ip} was refused. Is the gimbal on and accessible?"
)
except Exception as e:
print(f"SDK Test: An error occurred: {e}")
finally:
if gimbal.is_connected:
print("SDK Test: Disconnecting...")
await gimbal.disconnect()
print("SDK Test: Program finished.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
# To see debug logs from this SDK specifically:
# logger.setLevel(logging.DEBUG)
asyncio.run(main_sdk_test())

View File

@@ -0,0 +1,61 @@
import asyncio
import logging
from siyi import (
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
)
# Set up basic logging for this test script
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
# Setup: Define your camera instance and connection parameters
camera = SiyiGimbalCamera(ip="192.168.1.17", port=37260)
try:
await camera.connect()
# Initial Connection Established: Wait briefly before sending the commands
await asyncio.sleep(2)
# Command 1: Move all the way to the right (using set angles)
logger.info("Command 1: Move all the way to the right (using absolute angle control)")
await camera.send_attitude_angles_command(135.0, 0.0)
await asyncio.sleep(5)
# Command 2: Look down (using relative control)
logger.info("Command 2: Start looking down (relative speed control)")
# The numbers used are example numbers and may be different from the numbers you use.
await camera.send_rotation_command(0, 50)
await asyncio.sleep(5)
# Command 3: Stop looking down, then look up (with the single axis)
logger.info("Command 3: Stop looking down and start looking up (single axis control)")
await camera.send_rotation_command(0, 0)
await camera.send_single_axis_attitude_command(135, SingleAxis.PITCH)
await asyncio.sleep(5)
# Command 4: Reset and move all the way to the left (Absolute value).
logger.info("Command 4: Move back to the center, and start moving all the way left")
await camera.send_attitude_angles_command(-135.0, 0.0)
await asyncio.sleep(5)
# Command 5: Final rotation to the center (using set angles)
logger.info("Command 5: Moving back to the center.")
await camera.send_attitude_angles_command(0, 0)
await asyncio.sleep(5)
except Exception as e:
logger.exception(f"An exception occurred: {e}")
finally:
# Disconnect and cleanup after commands have run.
await camera.disconnect()
logger.info("Test script completed and shutdown")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,7 +20,8 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
"core = core_pkg.core_node:main", "core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main" "headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main"
], ],
}, },
) )