diff --git a/rover_launch.py b/rover_launch.py index 90b4485..1e24f2a 100644 --- a/rover_launch.py +++ b/rover_launch.py @@ -38,6 +38,16 @@ def launch_setup(context, *args, **kwargs): on_exit=Shutdown() ) ) + nodes.append( + Node( + package='core_pkg', + executable='ptz', # change as needed + name='ptz', + output='both' + # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical + # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure + ) + ) nodes.append( Node( package='bio_pkg', @@ -58,7 +68,7 @@ def launch_setup(context, *args, **kwargs): on_exit=Shutdown() ) ) - elif mode in ['arm', 'core', 'bio']: + elif mode in ['arm', 'core', 'bio', 'ptz']: # Only launch the node corresponding to the provided mode. if mode == 'arm': nodes.append( @@ -93,10 +103,19 @@ def launch_setup(context, *args, **kwargs): on_exit=Shutdown() ) ) + elif mode == 'ptz': + nodes.append( + Node( + package='core_pkg', + executable='ptz', + name='ptz', + output='both' + on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched + ) + ) else: # If an invalid mode is provided, print an error. - # (You might want to raise an exception or handle it differently.) - print("Invalid mode provided. Choose one of: arm, core, bio, anchor.") + print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.") return nodes @@ -104,7 +123,7 @@ def generate_launch_description(): declare_arg = DeclareLaunchArgument( 'mode', default_value='anchor', - description='Launch mode: arm, core, bio, or anchor' + description='Launch mode: arm, core, bio, anchor, or ptz' ) return LaunchDescription([ diff --git a/src/core_pkg/core_pkg/core_ptz.py b/src/core_pkg/core_pkg/core_ptz.py new file mode 100644 index 0000000..be145dc --- /dev/null +++ b/src/core_pkg/core_pkg/core_ptz.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +import rclpy +from rclpy.node import Node +import asyncio +from concurrent.futures import ThreadPoolExecutor +import signal +import sys +import threading +import time + +from std_msgs.msg import String +from ros2_interfaces_pkg.msg import PtzControl, PtzFeedback + +# Import the SIYI SDK +from core_pkg.siyi_sdk import ( + SiyiGimbalCamera, + CommandID, + DataStreamType, + DataStreamFrequency, + SingleAxis, + AttitudeData +) + +class PtzNode(Node): + def __init__(self): + # Initialize node with name + super().__init__("core_ptz") + + # Declare parameters + self.declare_parameter('camera_ip', '192.168.1.17') + self.declare_parameter('camera_port', 37260) + #self.declare_parameter('launch_mode', 'core') + + # Get parameters + self.camera_ip = self.get_parameter('camera_ip').value + self.camera_port = self.get_parameter('camera_port').value + #self.launch_mode = self.get_parameter('launch_mode').value + + #self.get_logger().info(f"PTZ mode: {self.launch_mode}") + self.get_logger().info(f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}") + + # Create a camera instance + self.camera = None + self.camera_connected = False + self.loop = None + self.executor = None + + # Create publishers + self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) + self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) + + # Create subscribers + self.control_sub = self.create_subscription( + PtzControl, '/ptz/control', self.handle_control_command, 10) + + # Create timers + self.connection_timer = self.create_timer(5.0, self.check_camera_connection) + + # Create feedback message + self.feedback_msg = PtzFeedback() + self.feedback_msg.connected = False + self.feedback_msg.error_msg = "Initializing" + + # Flags for async operations + self.shutdown_requested = False + + # Set up asyncio event loop in a separate thread + self.executor = ThreadPoolExecutor(max_workers=1) + self.loop = asyncio.new_event_loop() + + # Connect to camera on startup + self.connect_task = self.executor.submit( + self.run_async_func, self.connect_to_camera() + ) + + async def connect_to_camera(self): + """Connect to the SIYI camera.""" + try: + # Create a new camera instance + self.camera = SiyiGimbalCamera(ip=self.camera_ip, port=self.camera_port) + + # Connect to the camera + await self.camera.connect() + + # Set up data callback + self.camera.set_data_callback(self.camera_data_callback) + + # Request attitude data stream + await self.camera.send_data_stream_request( + DataStreamType.ATTITUDE_DATA, + DataStreamFrequency.HZ_10 + ) + + # Update connection status + self.camera_connected = True + self.feedback_msg.connected = True + self.feedback_msg.error_msg = "" + + self.get_logger().info("Connected to PTZ camera") + self.publish_debug("Camera connected successfully") + + except Exception as e: + self.camera_connected = False + self.feedback_msg.connected = False + self.feedback_msg.error_msg = f"Connection error: {str(e)}" + self.get_logger().error(f"Failed to connect to camera: {e}") + self.publish_debug(f"Camera connection failed: {str(e)}") + + def camera_data_callback(self, cmd_id, data): + """Handle data received from the camera.""" + if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): + # Update feedback message with attitude data + self.feedback_msg.yaw = data.yaw + self.feedback_msg.pitch = data.pitch + self.feedback_msg.roll = data.roll + self.feedback_msg.yaw_velocity = data.yaw_velocity + self.feedback_msg.pitch_velocity = data.pitch_velocity + self.feedback_msg.roll_velocity = data.roll_velocity + + # Publish feedback + self.feedback_pub.publish(self.feedback_msg) + else: + # Log other data for debugging + debug_str = f"Camera data: CMD_ID={cmd_id}, Data={data}" + self.get_logger().debug(debug_str) + + def check_camera_connection(self): + """Periodically check camera connection and attempt to reconnect if needed.""" + if not self.camera_connected and not self.shutdown_requested: + self.get_logger().info("Attempting to reconnect to camera...") + self.connect_task = self.executor.submit( + self.run_async_func, self.connect_to_camera() + ) + + def handle_control_command(self, msg): + """Handle incoming control commands.""" + if not self.camera_connected: + self.get_logger().warning("Camera not connected, ignoring control command") + return + + # Submit the control command to the async executor + self.executor.submit( + self.run_async_func, + self.process_control_command(msg) + ) + + async def process_control_command(self, msg): + """Process and send the control command to the camera.""" + try: + # Check if reset command + if msg.reset: + self.get_logger().info("Resetting camera to center position") + await self.camera.send_attitude_angles_command(0.0, 0.0) + return + + # Process based on control mode + if msg.control_mode == 0: # Relative speed control + # Clamp values between -100 and 100 + turn_yaw = max(-100, min(100, msg.turn_yaw)) + turn_pitch = max(-100, min(100, msg.turn_pitch)) + + self.get_logger().debug(f"Sending rotation command: yaw={turn_yaw}, pitch={turn_pitch}") + await self.camera.send_rotation_command(turn_yaw, turn_pitch) + + elif msg.control_mode == 1: # Absolute angle control + # Clamp angles to valid ranges + yaw = max(-135.0, min(135.0, msg.yaw)) + pitch = max(-90.0, min(90.0, msg.pitch)) + + self.get_logger().debug(f"Sending absolute angles: yaw={yaw}, pitch={pitch}") + await self.camera.send_attitude_angles_command(yaw, pitch) + + elif msg.control_mode == 2: # Single axis control + axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH + self.get_logger().debug(f"Sending single axis command: axis={axis.name}, angle={msg.angle}") + await self.camera.send_single_axis_attitude_command(msg.angle, axis) + + # Handle data streaming configuration if requested + if msg.stream_type > 0 and msg.stream_freq >= 0: + try: + stream_type = DataStreamType(msg.stream_type) + stream_freq = DataStreamFrequency(msg.stream_freq) + + self.get_logger().info( + f"Setting data stream: type={stream_type.name}, freq={stream_freq.name}" + ) + + await self.camera.send_data_stream_request(stream_type, stream_freq) + + except ValueError: + self.get_logger().error("Invalid stream type or frequency values") + + except Exception as e: + self.get_logger().error(f"Error processing control command: {e}") + self.feedback_msg.error_msg = f"Control error: {str(e)}" + self.feedback_pub.publish(self.feedback_msg) + + def publish_debug(self, message): + """Publish debug message.""" + msg = String() + msg.data = message + self.debug_pub.publish(msg) + + def run_async_func(self, coro): + """Run an async function in the event loop.""" + return asyncio.run_coroutine_threadsafe(coro, self.loop).result() + + async def shutdown(self): + """Perform clean shutdown.""" + self.shutdown_requested = True + + if self.camera and self.camera_connected: + try: + # Stop any data streams + await self.camera.send_data_stream_request( + DataStreamType.ATTITUDE_DATA, + DataStreamFrequency.DISABLE + ) + + # Disconnect from camera + await self.camera.disconnect() + self.get_logger().info("Disconnected from camera") + + except Exception as e: + self.get_logger().error(f"Error during shutdown: {e}") + + def cleanup(self): + """Clean up resources.""" + if self.loop and self.executor: + # Schedule the shutdown coroutine + try: + self.run_async_func(self.shutdown()) + except Exception as e: + self.get_logger().error(f"Error during cleanup: {e}") + + # Shut down the executor + self.executor.shutdown() + + # Close the event loop + self.loop.stop() + self.loop.close() + + self.get_logger().info("PTZ node resources cleaned up") + +def main(args=None): + """Main function.""" + # Initialize ROS + rclpy.init(args=args) + + # Create the node + ptz_node = PtzNode() + + # Create and start event loop for the camera + def run_event_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + # Start the asyncio loop in a separate thread + asyncio_thread = threading.Thread( + target=run_event_loop, + args=(ptz_node.loop,), + daemon=True + ) + asyncio_thread.start() + + try: + # Spin the node to process callbacks + rclpy.spin(ptz_node) + except KeyboardInterrupt: + pass + finally: + # Clean up + ptz_node.cleanup() + rclpy.shutdown() + +if __name__ == '__main__': + # Register signal handler for clean shutdown + signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0)) + signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) + main() + diff --git a/src/core_pkg/core_pkg/siyi_sdk/.gitignore b/src/core_pkg/core_pkg/siyi_sdk/.gitignore new file mode 100644 index 0000000..0a19790 --- /dev/null +++ b/src/core_pkg/core_pkg/siyi_sdk/.gitignore @@ -0,0 +1,174 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc diff --git a/src/core_pkg/core_pkg/siyi_sdk/README.md b/src/core_pkg/core_pkg/siyi_sdk/README.md new file mode 100644 index 0000000..7277a6f --- /dev/null +++ b/src/core_pkg/core_pkg/siyi_sdk/README.md @@ -0,0 +1,26 @@ +# SIYI Gimbal Camera Python Library + +This library provides a Python interface for controlling SIYI gimbal cameras over TCP. It allows you to: + +- Connect to the camera and maintain a stable connection. +- Control the camera's movement (pan, tilt). +- Request a continuous stream of data (attitude, etc.). + +> [!NOTE] +> The code for this library (including this README) was written almost entirely by an AI assistant, with +> the assistance and direction of a human developer. It probably sucks, but I need sleep, so I'm not going +> to redo it. Sorry. + +## Core Features + +- **Asynchronous communication:** Utilizes `asyncio` for non-blocking operation. +- **Command handling:** Easy-to-use functions for sending control commands. +- **Data streaming:** Ability to receive continuous data streams for real-time monitoring. + +## Key Components + +- `SiyiGimbalCamera` class: Main class for interfacing with the camera. +- Enums: `DataStreamType`, `DataStreamFrequency`, `SingleAxis`, `CommandID` to manage protocol values. +- Logging: Uses the Python `logging` library. + +Refer to the code comments and docstrings for specific function details. diff --git a/src/core_pkg/core_pkg/siyi_sdk/__init__.py b/src/core_pkg/core_pkg/siyi_sdk/__init__.py new file mode 100644 index 0000000..d48c636 --- /dev/null +++ b/src/core_pkg/core_pkg/siyi_sdk/__init__.py @@ -0,0 +1 @@ +from .siyi import * diff --git a/src/core_pkg/core_pkg/siyi_sdk/siyi.py b/src/core_pkg/core_pkg/siyi_sdk/siyi.py new file mode 100644 index 0000000..2704d65 --- /dev/null +++ b/src/core_pkg/core_pkg/siyi_sdk/siyi.py @@ -0,0 +1,363 @@ +import asyncio +import socket +import struct +import logging +from enum import Enum +from dataclasses import dataclass +from typing import Optional, Callable, Any +from crccheck.crc import Crc16 + +# Set up logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) # You can adjust the logging level as needed +handler = logging.StreamHandler() # Output to console +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + +# Global constant for the heartbeat packet (used in TCP keep-alive) +HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B") + + +class DataStreamType(Enum): + """ + Enumeration for data stream types in the 0x25 command. + + Allows users to select which type of data to stream from the gimbal camera. + """ + + ATTITUDE_DATA = 1 + LASER_RANGE_DATA = 2 + MAGNETIC_ENCODER_ANGLE_DATA = 3 + MOTOR_VOLTAGE_DATA = 4 + + +class DataStreamFrequency(Enum): + """ + Enumeration for data stream frequencies in the 0x25 command. + + Allows the user to select the output (streaming) frequency. + """ + + DISABLE = 0 + HZ_2 = 1 + HZ_4 = 2 + HZ_5 = 3 + HZ_10 = 4 + HZ_20 = 5 + HZ_50 = 6 + HZ_100 = 7 + + +class SingleAxis(Enum): + """ + Enumeration for specifying the controlled axis in the 0x41 single-axis control command. + + YAW corresponds to 0 and PITCH corresponds to 1. + """ + + YAW = 0 + PITCH = 1 + + +class CommandID(Enum): + """ + Enumeration for command identifiers used in the protocol. + + These identifiers are sent as the CMD_ID in a packet and help in matching + responses or processing incoming packets. + """ + + ROTATION_CONTROL = 0x07 + ATTITUDE_ANGLES = 0x0E + SINGLE_AXIS_CONTROL = 0x41 + DATA_STREAM_REQUEST = 0x25 + ATTITUDE_DATA_RESPONSE = 0x0D # Expected response packet for attitude data + + +@dataclass +class AttitudeData: + """ + Class representing the parsed attitude data from the gimbal. + + Angles (yaw, pitch, roll) are in degrees and are scaled using one decimal point precision. + Angular velocities are raw int16 values. + """ + + yaw: float + pitch: float + roll: float + yaw_velocity: int + pitch_velocity: int + roll_velocity: int + + @classmethod + def from_bytes(cls, data: bytes) -> "AttitudeData": + """Create an AttitudeData instance from a byte string.""" + if len(data) != 12: + raise ValueError("Attitude data should be exactly 12 bytes.") + # Unpack six little-endian int16 values + values = struct.unpack(" None: + """Establish a TCP connection with the gimbal camera.""" + try: + self.reader, self.writer = await asyncio.open_connection( + self.ip, self.port + ) + self.is_connected = True + logger.info(f"Connected to {self.ip}:{self.port}") + asyncio.create_task(self.heartbeat_loop()) + # Start the data stream listener in the background. + asyncio.create_task(self._data_stream_listener()) + except (socket.gaierror, ConnectionRefusedError) as e: + self.is_connected = False + logger.error(f"Could not connect: {e}") + raise + + async def disconnect(self) -> None: + """Gracefully disconnect from the gimbal camera.""" + if self.is_connected: + self.is_connected = False + self.writer.close() + await self.writer.wait_closed() + logger.info("Disconnected") + else: + logger.warning("Not connected, cannot disconnect.") + + async def heartbeat_loop(self) -> None: + """Periodically send heartbeat packets to maintain the TCP connection.""" + if not self.is_connected: + logger.warning("Heartbeat loop started before connection was established.") + return + + try: + while self.is_connected: + try: + self.writer.write(HEARTBEAT_PACKET) + await self.writer.drain() + logger.debug("Sent heartbeat packet") + await asyncio.sleep(self.heartbeat_interval) + except (socket.error, BrokenPipeError) as e: + logger.error(f"Connection error in heartbeat loop: {e}") + break + finally: + logger.info("Heartbeat loop stopped.") + if self.is_connected: + await self.disconnect() + + def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: + """Build a full packet for the 0x07 Gimbal Rotation Control command.""" + packet = bytearray() + packet.extend(b"\x55\x66") # STX bytes + packet.append(0x01) # CTRL (request ACK) + packet.extend(struct.pack(" None: + """Send a rotation control command (0x07) with the given yaw and pitch values.""" + if not self.is_connected: + raise RuntimeError("Socket is not connected, cannot send rotation command.") + packet = self._build_rotation_packet(turn_yaw, turn_pitch) + self.writer.write(packet) + await self.writer.drain() + logger.debug(f"Sent rotation command with yaw {turn_yaw} and pitch {turn_pitch}") + + def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes: + """Build a full packet for the 0x0E 'Set Gimbal Attitude Angles' command.""" + packet = bytearray() + packet.extend(b"\x55\x66") + packet.append(0x01) # CTRL (request ACK) + packet.extend(struct.pack(" None: + """Send the 0x0E 'Set Gimbal Attitude Angles' command to set absolute angles.""" + if not self.is_connected: + raise RuntimeError("Socket is not connected, cannot send attitude angles command.") + packet = self._build_attitude_angles_packet(yaw, pitch) + self.writer.write(packet) + await self.writer.drain() + logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°") + + def _build_single_axis_attitude_packet(self, angle: float, axis: SingleAxis) -> bytes: + """Build a full packet for the 0x41 'Single-Axis Attitude Control' command.""" + packet = bytearray() + packet.extend(b"\x55\x66") + packet.append(0x01) # CTRL + packet.extend(struct.pack(" None: + """Send the 0x41 single-axis attitude control command.""" + if not self.is_connected: + raise RuntimeError("Socket is not connected, cannot send single-axis attitude command.") + packet = self._build_single_axis_attitude_packet(angle, axis) + self.writer.write(packet) + await self.writer.drain() + logger.debug( + f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°" + ) + + def _build_data_stream_packet(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> bytes: + """Build a packet for the 0x25 'Request Gimbal to Send Data Stream' command.""" + packet = bytearray() + packet.extend(b"\x55\x66") + packet.append(0x01) # CTRL, request ACK + packet.extend(struct.pack(" None: + """Send the 0x25 command to request a continuous data stream from the gimbal.""" + if not self.is_connected: + raise RuntimeError("Socket is not connected, cannot request data stream.") + packet = self._build_data_stream_packet(data_type, data_freq) + self.writer.write(packet) + await self.writer.drain() + logger.info(f"Sent data stream request: data_type {data_type}, data_freq {data_freq}") + + async def _read_packet(self): + """Read a full packet from the TCP stream following the protocol format.""" + stx = await self.reader.readexactly(2) + if stx != b"\x55\x66": + raise ValueError("Invalid packet start bytes.") + ctrl = await self.reader.readexactly(1) + data_len_bytes = await self.reader.readexactly(2) + data_len = struct.unpack(" AttitudeData: + """Parse the attitude data (12 bytes) into human-readable values.""" + if len(data) != 12: + raise ValueError("Attitude data should be exactly 12 bytes.") + return AttitudeData.from_bytes(data) + + async def _data_stream_listener(self) -> None: + """ + Continuously listen for incoming packets from the gimbal and process them. + + If a data callback is set via set_data_callback(), the callback is called + with the CommandID and the corresponding data (parsed or raw). + """ + while self.is_connected: + try: + cmd_id_int, data = await self._read_packet() + cmd_id = CommandID(cmd_id_int) + except Exception as e: + logger.error(f"Error reading packet: {e}") + continue + + # Process attitude data packets if applicable. + if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and len(data) == 12: + try: + parsed = AttitudeData.from_bytes(data) + except Exception as e: + logger.exception(f"Failed to parse attitude data: {e}") + parsed = None + if self._data_callback: + self._data_callback(cmd_id, parsed) + else: + logger.info(f"Received attitude data: {parsed}") + else: + if self._data_callback: + self._data_callback(cmd_id, data) + else: + logger.info(f"Received packet with CMD_ID {cmd_id}: {data}") + + def set_data_callback(self, callback: Callable[[CommandID, Any], None]) -> None: + """ + Change the callback function used by the data stream listener. + + The callback should be a function that accepts two arguments: + - A CommandID indicating the type of packet received. + - The parsed data (for attitude data, an AttitudeData instance) or raw data. + + :param callback: A callable to be used as the data callback. + """ + self._data_callback = callback diff --git a/src/core_pkg/core_pkg/siyi_sdk/test.py b/src/core_pkg/core_pkg/siyi_sdk/test.py new file mode 100644 index 0000000..24e7693 --- /dev/null +++ b/src/core_pkg/core_pkg/siyi_sdk/test.py @@ -0,0 +1,61 @@ +import asyncio +import logging + +from siyi import ( + SiyiGimbalCamera, + CommandID, + DataStreamType, + DataStreamFrequency, + SingleAxis, +) + +# Set up basic logging for this test script +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + # Setup: Define your camera instance and connection parameters + camera = SiyiGimbalCamera(ip="192.168.1.17", port=37260) + + try: + await camera.connect() + # Initial Connection Established: Wait briefly before sending the commands + await asyncio.sleep(2) + + # Command 1: Move all the way to the right (using set angles) + logger.info("Command 1: Move all the way to the right (using absolute angle control)") + await camera.send_attitude_angles_command(135.0, 0.0) + await asyncio.sleep(5) + + # Command 2: Look down (using relative control) + logger.info("Command 2: Start looking down (relative speed control)") + # The numbers used are example numbers and may be different from the numbers you use. + await camera.send_rotation_command(0, 50) + await asyncio.sleep(5) + + # Command 3: Stop looking down, then look up (with the single axis) + logger.info("Command 3: Stop looking down and start looking up (single axis control)") + await camera.send_rotation_command(0, 0) + await camera.send_single_axis_attitude_command(135, SingleAxis.PITCH) + await asyncio.sleep(5) + + # Command 4: Reset and move all the way to the left (Absolute value). + logger.info("Command 4: Move back to the center, and start moving all the way left") + await camera.send_attitude_angles_command(-135.0, 0.0) + await asyncio.sleep(5) + + # Command 5: Final rotation to the center (using set angles) + logger.info("Command 5: Moving back to the center.") + await camera.send_attitude_angles_command(0, 0) + await asyncio.sleep(5) + except Exception as e: + logger.exception(f"An exception occurred: {e}") + finally: + # Disconnect and cleanup after commands have run. + await camera.disconnect() + logger.info("Test script completed and shutdown") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/core_pkg/setup.py b/src/core_pkg/setup.py index 77bda6d..0d3d80b 100644 --- a/src/core_pkg/setup.py +++ b/src/core_pkg/setup.py @@ -20,7 +20,8 @@ setup( entry_points={ 'console_scripts': [ "core = core_pkg.core_node:main", - "headless = core_pkg.core_headless:main" + "headless = core_pkg.core_headless:main", + "ptz = core_pkg.core_ptz:main" ], }, )