ai draft of ptz control code

This commit is contained in:
Tristan McGinnis
2025-05-25 18:08:56 -06:00
parent 4a9a0bfb0d
commit 3ed821f4fb
8 changed files with 931 additions and 5 deletions

View File

@@ -38,6 +38,16 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
nodes.append(
Node(
package='core_pkg',
executable='ptz', # change as needed
name='ptz',
output='both'
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
)
)
nodes.append( nodes.append(
Node( Node(
package='bio_pkg', package='bio_pkg',
@@ -58,7 +68,7 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
elif mode in ['arm', 'core', 'bio']: elif mode in ['arm', 'core', 'bio', 'ptz']:
# Only launch the node corresponding to the provided mode. # Only launch the node corresponding to the provided mode.
if mode == 'arm': if mode == 'arm':
nodes.append( nodes.append(
@@ -93,10 +103,19 @@ def launch_setup(context, *args, **kwargs):
on_exit=Shutdown() on_exit=Shutdown()
) )
) )
elif mode == 'ptz':
nodes.append(
Node(
package='core_pkg',
executable='ptz',
name='ptz',
output='both'
on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched
)
)
else: else:
# If an invalid mode is provided, print an error. # If an invalid mode is provided, print an error.
# (You might want to raise an exception or handle it differently.) print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.")
print("Invalid mode provided. Choose one of: arm, core, bio, anchor.")
return nodes return nodes
@@ -104,7 +123,7 @@ def generate_launch_description():
declare_arg = DeclareLaunchArgument( declare_arg = DeclareLaunchArgument(
'mode', 'mode',
default_value='anchor', default_value='anchor',
description='Launch mode: arm, core, bio, or anchor' description='Launch mode: arm, core, bio, anchor, or ptz'
) )
return LaunchDescription([ return LaunchDescription([

View File

@@ -0,0 +1,281 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import asyncio
from concurrent.futures import ThreadPoolExecutor
import signal
import sys
import threading
import time
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import PtzControl, PtzFeedback
# Import the SIYI SDK
from core_pkg.siyi_sdk import (
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
AttitudeData
)
class PtzNode(Node):
def __init__(self):
# Initialize node with name
super().__init__("core_ptz")
# Declare parameters
self.declare_parameter('camera_ip', '192.168.1.17')
self.declare_parameter('camera_port', 37260)
#self.declare_parameter('launch_mode', 'core')
# Get parameters
self.camera_ip = self.get_parameter('camera_ip').value
self.camera_port = self.get_parameter('camera_port').value
#self.launch_mode = self.get_parameter('launch_mode').value
#self.get_logger().info(f"PTZ mode: {self.launch_mode}")
self.get_logger().info(f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}")
# Create a camera instance
self.camera = None
self.camera_connected = False
self.loop = None
self.executor = None
# Create publishers
self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10)
self.debug_pub = self.create_publisher(String, '/ptz/debug', 10)
# Create subscribers
self.control_sub = self.create_subscription(
PtzControl, '/ptz/control', self.handle_control_command, 10)
# Create timers
self.connection_timer = self.create_timer(5.0, self.check_camera_connection)
# Create feedback message
self.feedback_msg = PtzFeedback()
self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Initializing"
# Flags for async operations
self.shutdown_requested = False
# Set up asyncio event loop in a separate thread
self.executor = ThreadPoolExecutor(max_workers=1)
self.loop = asyncio.new_event_loop()
# Connect to camera on startup
self.connect_task = self.executor.submit(
self.run_async_func, self.connect_to_camera()
)
async def connect_to_camera(self):
"""Connect to the SIYI camera."""
try:
# Create a new camera instance
self.camera = SiyiGimbalCamera(ip=self.camera_ip, port=self.camera_port)
# Connect to the camera
await self.camera.connect()
# Set up data callback
self.camera.set_data_callback(self.camera_data_callback)
# Request attitude data stream
await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA,
DataStreamFrequency.HZ_10
)
# Update connection status
self.camera_connected = True
self.feedback_msg.connected = True
self.feedback_msg.error_msg = ""
self.get_logger().info("Connected to PTZ camera")
self.publish_debug("Camera connected successfully")
except Exception as e:
self.camera_connected = False
self.feedback_msg.connected = False
self.feedback_msg.error_msg = f"Connection error: {str(e)}"
self.get_logger().error(f"Failed to connect to camera: {e}")
self.publish_debug(f"Camera connection failed: {str(e)}")
def camera_data_callback(self, cmd_id, data):
"""Handle data received from the camera."""
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData):
# Update feedback message with attitude data
self.feedback_msg.yaw = data.yaw
self.feedback_msg.pitch = data.pitch
self.feedback_msg.roll = data.roll
self.feedback_msg.yaw_velocity = data.yaw_velocity
self.feedback_msg.pitch_velocity = data.pitch_velocity
self.feedback_msg.roll_velocity = data.roll_velocity
# Publish feedback
self.feedback_pub.publish(self.feedback_msg)
else:
# Log other data for debugging
debug_str = f"Camera data: CMD_ID={cmd_id}, Data={data}"
self.get_logger().debug(debug_str)
def check_camera_connection(self):
"""Periodically check camera connection and attempt to reconnect if needed."""
if not self.camera_connected and not self.shutdown_requested:
self.get_logger().info("Attempting to reconnect to camera...")
self.connect_task = self.executor.submit(
self.run_async_func, self.connect_to_camera()
)
def handle_control_command(self, msg):
"""Handle incoming control commands."""
if not self.camera_connected:
self.get_logger().warning("Camera not connected, ignoring control command")
return
# Submit the control command to the async executor
self.executor.submit(
self.run_async_func,
self.process_control_command(msg)
)
async def process_control_command(self, msg):
"""Process and send the control command to the camera."""
try:
# Check if reset command
if msg.reset:
self.get_logger().info("Resetting camera to center position")
await self.camera.send_attitude_angles_command(0.0, 0.0)
return
# Process based on control mode
if msg.control_mode == 0: # Relative speed control
# Clamp values between -100 and 100
turn_yaw = max(-100, min(100, msg.turn_yaw))
turn_pitch = max(-100, min(100, msg.turn_pitch))
self.get_logger().debug(f"Sending rotation command: yaw={turn_yaw}, pitch={turn_pitch}")
await self.camera.send_rotation_command(turn_yaw, turn_pitch)
elif msg.control_mode == 1: # Absolute angle control
# Clamp angles to valid ranges
yaw = max(-135.0, min(135.0, msg.yaw))
pitch = max(-90.0, min(90.0, msg.pitch))
self.get_logger().debug(f"Sending absolute angles: yaw={yaw}, pitch={pitch}")
await self.camera.send_attitude_angles_command(yaw, pitch)
elif msg.control_mode == 2: # Single axis control
axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH
self.get_logger().debug(f"Sending single axis command: axis={axis.name}, angle={msg.angle}")
await self.camera.send_single_axis_attitude_command(msg.angle, axis)
# Handle data streaming configuration if requested
if msg.stream_type > 0 and msg.stream_freq >= 0:
try:
stream_type = DataStreamType(msg.stream_type)
stream_freq = DataStreamFrequency(msg.stream_freq)
self.get_logger().info(
f"Setting data stream: type={stream_type.name}, freq={stream_freq.name}"
)
await self.camera.send_data_stream_request(stream_type, stream_freq)
except ValueError:
self.get_logger().error("Invalid stream type or frequency values")
except Exception as e:
self.get_logger().error(f"Error processing control command: {e}")
self.feedback_msg.error_msg = f"Control error: {str(e)}"
self.feedback_pub.publish(self.feedback_msg)
def publish_debug(self, message):
"""Publish debug message."""
msg = String()
msg.data = message
self.debug_pub.publish(msg)
def run_async_func(self, coro):
"""Run an async function in the event loop."""
return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
async def shutdown(self):
"""Perform clean shutdown."""
self.shutdown_requested = True
if self.camera and self.camera_connected:
try:
# Stop any data streams
await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA,
DataStreamFrequency.DISABLE
)
# Disconnect from camera
await self.camera.disconnect()
self.get_logger().info("Disconnected from camera")
except Exception as e:
self.get_logger().error(f"Error during shutdown: {e}")
def cleanup(self):
"""Clean up resources."""
if self.loop and self.executor:
# Schedule the shutdown coroutine
try:
self.run_async_func(self.shutdown())
except Exception as e:
self.get_logger().error(f"Error during cleanup: {e}")
# Shut down the executor
self.executor.shutdown()
# Close the event loop
self.loop.stop()
self.loop.close()
self.get_logger().info("PTZ node resources cleaned up")
def main(args=None):
"""Main function."""
# Initialize ROS
rclpy.init(args=args)
# Create the node
ptz_node = PtzNode()
# Create and start event loop for the camera
def run_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# Start the asyncio loop in a separate thread
asyncio_thread = threading.Thread(
target=run_event_loop,
args=(ptz_node.loop,),
daemon=True
)
asyncio_thread.start()
try:
# Spin the node to process callbacks
rclpy.spin(ptz_node)
except KeyboardInterrupt:
pass
finally:
# Clean up
ptz_node.cleanup()
rclpy.shutdown()
if __name__ == '__main__':
# Register signal handler for clean shutdown
signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0))
main()

View File

@@ -0,0 +1,174 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

View File

@@ -0,0 +1,26 @@
# SIYI Gimbal Camera Python Library
This library provides a Python interface for controlling SIYI gimbal cameras over TCP. It allows you to:
- Connect to the camera and maintain a stable connection.
- Control the camera's movement (pan, tilt).
- Request a continuous stream of data (attitude, etc.).
> [!NOTE]
> The code for this library (including this README) was written almost entirely by an AI assistant, with
> the assistance and direction of a human developer. It probably sucks, but I need sleep, so I'm not going
> to redo it. Sorry.
## Core Features
- **Asynchronous communication:** Utilizes `asyncio` for non-blocking operation.
- **Command handling:** Easy-to-use functions for sending control commands.
- **Data streaming:** Ability to receive continuous data streams for real-time monitoring.
## Key Components
- `SiyiGimbalCamera` class: Main class for interfacing with the camera.
- Enums: `DataStreamType`, `DataStreamFrequency`, `SingleAxis`, `CommandID` to manage protocol values.
- Logging: Uses the Python `logging` library.
Refer to the code comments and docstrings for specific function details.

View File

@@ -0,0 +1 @@
from .siyi import *

View File

@@ -0,0 +1,363 @@
import asyncio
import socket
import struct
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
from crccheck.crc import Crc16
# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) # You can adjust the logging level as needed
handler = logging.StreamHandler() # Output to console
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Global constant for the heartbeat packet (used in TCP keep-alive)
HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B")
class DataStreamType(Enum):
"""
Enumeration for data stream types in the 0x25 command.
Allows users to select which type of data to stream from the gimbal camera.
"""
ATTITUDE_DATA = 1
LASER_RANGE_DATA = 2
MAGNETIC_ENCODER_ANGLE_DATA = 3
MOTOR_VOLTAGE_DATA = 4
class DataStreamFrequency(Enum):
"""
Enumeration for data stream frequencies in the 0x25 command.
Allows the user to select the output (streaming) frequency.
"""
DISABLE = 0
HZ_2 = 1
HZ_4 = 2
HZ_5 = 3
HZ_10 = 4
HZ_20 = 5
HZ_50 = 6
HZ_100 = 7
class SingleAxis(Enum):
"""
Enumeration for specifying the controlled axis in the 0x41 single-axis control command.
YAW corresponds to 0 and PITCH corresponds to 1.
"""
YAW = 0
PITCH = 1
class CommandID(Enum):
"""
Enumeration for command identifiers used in the protocol.
These identifiers are sent as the CMD_ID in a packet and help in matching
responses or processing incoming packets.
"""
ROTATION_CONTROL = 0x07
ATTITUDE_ANGLES = 0x0E
SINGLE_AXIS_CONTROL = 0x41
DATA_STREAM_REQUEST = 0x25
ATTITUDE_DATA_RESPONSE = 0x0D # Expected response packet for attitude data
@dataclass
class AttitudeData:
"""
Class representing the parsed attitude data from the gimbal.
Angles (yaw, pitch, roll) are in degrees and are scaled using one decimal point precision.
Angular velocities are raw int16 values.
"""
yaw: float
pitch: float
roll: float
yaw_velocity: int
pitch_velocity: int
roll_velocity: int
@classmethod
def from_bytes(cls, data: bytes) -> "AttitudeData":
"""Create an AttitudeData instance from a byte string."""
if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.")
# Unpack six little-endian int16 values
values = struct.unpack("<hhhhhh", data)
return cls(
yaw=values[0] / 10.0,
pitch=values[1] / 10.0,
roll=values[2] / 10.0,
yaw_velocity=values[3],
pitch_velocity=values[4],
roll_velocity=values[5],
)
class SiyiGimbalCamera:
"""
Class to interface with the SIYI Gimbal Camera over TCP.
This class handles:
- Establishing a TCP connection.
- Sending periodic heartbeat packets.
- Building and sending commands (e.g. for rotation, setting attitudes,
single-axis control, and data stream requests).
- Continuously reading incoming packets in a background task.
Command identifiers and axis flags are represented as enums, and the parsed
attitude data is returned as an AttitudeData object.
The data stream listener is started as a background task when a connection is
established. Users can provide or update the callback used for processing data
stream packets by calling set_data_callback().
"""
def __init__(self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2):
"""
Initialize a new SiyiGimbalCamera instance.
:param ip: The IP address (as a string) of the SIYI gimbal camera.
:param port: The TCP port (as an integer) used for the connection (default: 37260).
:param heartbeat_interval: Interval in seconds (int) between heartbeat packets.
"""
self.ip = ip
self.port = port
self.heartbeat_interval = heartbeat_interval
# These will be assigned after successfully connecting.
self.reader: asyncio.StreamReader
self.writer: asyncio.StreamWriter
self.is_connected = False
# Sequence counter (integer in the range 0 to 65535) for outgoing packets.
self.seq: int = 0
self._data_callback: Optional[Callable[[CommandID, Any], None]] = None
async def connect(self) -> None:
"""Establish a TCP connection with the gimbal camera."""
try:
self.reader, self.writer = await asyncio.open_connection(
self.ip, self.port
)
self.is_connected = True
logger.info(f"Connected to {self.ip}:{self.port}")
asyncio.create_task(self.heartbeat_loop())
# Start the data stream listener in the background.
asyncio.create_task(self._data_stream_listener())
except (socket.gaierror, ConnectionRefusedError) as e:
self.is_connected = False
logger.error(f"Could not connect: {e}")
raise
async def disconnect(self) -> None:
"""Gracefully disconnect from the gimbal camera."""
if self.is_connected:
self.is_connected = False
self.writer.close()
await self.writer.wait_closed()
logger.info("Disconnected")
else:
logger.warning("Not connected, cannot disconnect.")
async def heartbeat_loop(self) -> None:
"""Periodically send heartbeat packets to maintain the TCP connection."""
if not self.is_connected:
logger.warning("Heartbeat loop started before connection was established.")
return
try:
while self.is_connected:
try:
self.writer.write(HEARTBEAT_PACKET)
await self.writer.drain()
logger.debug("Sent heartbeat packet")
await asyncio.sleep(self.heartbeat_interval)
except (socket.error, BrokenPipeError) as e:
logger.error(f"Connection error in heartbeat loop: {e}")
break
finally:
logger.info("Heartbeat loop stopped.")
if self.is_connected:
await self.disconnect()
def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes:
"""Build a full packet for the 0x07 Gimbal Rotation Control command."""
packet = bytearray()
packet.extend(b"\x55\x66") # STX bytes
packet.append(0x01) # CTRL (request ACK)
packet.extend(struct.pack("<H", 2)) # Data length: 2 bytes
packet.extend(struct.pack("<H", self.seq)) # Sequence number
packet.append(CommandID.ROTATION_CONTROL.value) # CMD_ID for rotation control
packet.extend(struct.pack("bb", turn_yaw, turn_pitch)) # Data: two int8 values
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_rotation_command(self, turn_yaw: int, turn_pitch: int) -> None:
"""Send a rotation control command (0x07) with the given yaw and pitch values."""
if not self.is_connected:
raise RuntimeError("Socket is not connected, cannot send rotation command.")
packet = self._build_rotation_packet(turn_yaw, turn_pitch)
self.writer.write(packet)
await self.writer.drain()
logger.debug(f"Sent rotation command with yaw {turn_yaw} and pitch {turn_pitch}")
def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes:
"""Build a full packet for the 0x0E 'Set Gimbal Attitude Angles' command."""
packet = bytearray()
packet.extend(b"\x55\x66")
packet.append(0x01) # CTRL (request ACK)
packet.extend(struct.pack("<H", 4)) # Data length: 4 bytes (2 for each angle)
packet.extend(struct.pack("<H", self.seq))
packet.append(CommandID.ATTITUDE_ANGLES.value) # CMD_ID for set gimbal attitude angles
yaw_int = int(round(yaw * 10))
pitch_int = int(round(pitch * 10))
packet.extend(struct.pack("<hh", yaw_int, pitch_int))
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_attitude_angles_command(self, yaw: float, pitch: float) -> None:
"""Send the 0x0E 'Set Gimbal Attitude Angles' command to set absolute angles."""
if not self.is_connected:
raise RuntimeError("Socket is not connected, cannot send attitude angles command.")
packet = self._build_attitude_angles_packet(yaw, pitch)
self.writer.write(packet)
await self.writer.drain()
logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°")
def _build_single_axis_attitude_packet(self, angle: float, axis: SingleAxis) -> bytes:
"""Build a full packet for the 0x41 'Single-Axis Attitude Control' command."""
packet = bytearray()
packet.extend(b"\x55\x66")
packet.append(0x01) # CTRL
packet.extend(struct.pack("<H", 3)) # Data length: 3 bytes
packet.extend(struct.pack("<H", self.seq))
packet.append(CommandID.SINGLE_AXIS_CONTROL.value) # CMD_ID for single-axis control
angle_int = int(round(angle * 10))
packet.extend(struct.pack("<hB", angle_int, axis.value))
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_single_axis_attitude_command(self, angle: float, axis: SingleAxis) -> None:
"""Send the 0x41 single-axis attitude control command."""
if not self.is_connected:
raise RuntimeError("Socket is not connected, cannot send single-axis attitude command.")
packet = self._build_single_axis_attitude_packet(angle, axis)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°"
)
def _build_data_stream_packet(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> bytes:
"""Build a packet for the 0x25 'Request Gimbal to Send Data Stream' command."""
packet = bytearray()
packet.extend(b"\x55\x66")
packet.append(0x01) # CTRL, request ACK
packet.extend(struct.pack("<H", 2)) # Data length: 2 bytes
packet.extend(struct.pack("<H", self.seq))
packet.append(CommandID.DATA_STREAM_REQUEST.value) # CMD_ID for data stream request
packet.append(data_type.value) # Data type (uint8)
packet.append(data_freq.value) # Data frequency (uint8)
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_data_stream_request(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> None:
"""Send the 0x25 command to request a continuous data stream from the gimbal."""
if not self.is_connected:
raise RuntimeError("Socket is not connected, cannot request data stream.")
packet = self._build_data_stream_packet(data_type, data_freq)
self.writer.write(packet)
await self.writer.drain()
logger.info(f"Sent data stream request: data_type {data_type}, data_freq {data_freq}")
async def _read_packet(self):
"""Read a full packet from the TCP stream following the protocol format."""
stx = await self.reader.readexactly(2)
if stx != b"\x55\x66":
raise ValueError("Invalid packet start bytes.")
ctrl = await self.reader.readexactly(1)
data_len_bytes = await self.reader.readexactly(2)
data_len = struct.unpack("<H", data_len_bytes)[0]
seq = await self.reader.readexactly(2)
cmd_id_bytes = await self.reader.readexactly(1)
cmd_id = cmd_id_bytes[0]
data = await self.reader.readexactly(data_len)
crc_bytes = await self.reader.readexactly(2)
received_crc = struct.unpack("<H", crc_bytes)[0]
packet_without_crc = stx + ctrl + data_len_bytes + seq + cmd_id_bytes + data
computed_crc = Crc16.calc(packet_without_crc)
if computed_crc != received_crc:
raise ValueError("CRC check failed for received packet.")
return cmd_id, data
@staticmethod
def parse_attitude_data(data: bytes) -> AttitudeData:
"""Parse the attitude data (12 bytes) into human-readable values."""
if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.")
return AttitudeData.from_bytes(data)
async def _data_stream_listener(self) -> None:
"""
Continuously listen for incoming packets from the gimbal and process them.
If a data callback is set via set_data_callback(), the callback is called
with the CommandID and the corresponding data (parsed or raw).
"""
while self.is_connected:
try:
cmd_id_int, data = await self._read_packet()
cmd_id = CommandID(cmd_id_int)
except Exception as e:
logger.error(f"Error reading packet: {e}")
continue
# Process attitude data packets if applicable.
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and len(data) == 12:
try:
parsed = AttitudeData.from_bytes(data)
except Exception as e:
logger.exception(f"Failed to parse attitude data: {e}")
parsed = None
if self._data_callback:
self._data_callback(cmd_id, parsed)
else:
logger.info(f"Received attitude data: {parsed}")
else:
if self._data_callback:
self._data_callback(cmd_id, data)
else:
logger.info(f"Received packet with CMD_ID {cmd_id}: {data}")
def set_data_callback(self, callback: Callable[[CommandID, Any], None]) -> None:
"""
Change the callback function used by the data stream listener.
The callback should be a function that accepts two arguments:
- A CommandID indicating the type of packet received.
- The parsed data (for attitude data, an AttitudeData instance) or raw data.
:param callback: A callable to be used as the data callback.
"""
self._data_callback = callback

View File

@@ -0,0 +1,61 @@
import asyncio
import logging
from siyi import (
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
)
# Set up basic logging for this test script
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
# Setup: Define your camera instance and connection parameters
camera = SiyiGimbalCamera(ip="192.168.1.17", port=37260)
try:
await camera.connect()
# Initial Connection Established: Wait briefly before sending the commands
await asyncio.sleep(2)
# Command 1: Move all the way to the right (using set angles)
logger.info("Command 1: Move all the way to the right (using absolute angle control)")
await camera.send_attitude_angles_command(135.0, 0.0)
await asyncio.sleep(5)
# Command 2: Look down (using relative control)
logger.info("Command 2: Start looking down (relative speed control)")
# The numbers used are example numbers and may be different from the numbers you use.
await camera.send_rotation_command(0, 50)
await asyncio.sleep(5)
# Command 3: Stop looking down, then look up (with the single axis)
logger.info("Command 3: Stop looking down and start looking up (single axis control)")
await camera.send_rotation_command(0, 0)
await camera.send_single_axis_attitude_command(135, SingleAxis.PITCH)
await asyncio.sleep(5)
# Command 4: Reset and move all the way to the left (Absolute value).
logger.info("Command 4: Move back to the center, and start moving all the way left")
await camera.send_attitude_angles_command(-135.0, 0.0)
await asyncio.sleep(5)
# Command 5: Final rotation to the center (using set angles)
logger.info("Command 5: Moving back to the center.")
await camera.send_attitude_angles_command(0, 0)
await asyncio.sleep(5)
except Exception as e:
logger.exception(f"An exception occurred: {e}")
finally:
# Disconnect and cleanup after commands have run.
await camera.disconnect()
logger.info("Test script completed and shutdown")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,7 +20,8 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
"core = core_pkg.core_node:main", "core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main" "headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main"
], ],
}, },
) )