From a4d3f0dfbe088628c09770a20e3ccd848f7c8991 Mon Sep 17 00:00:00 2001 From: ryleu <69326171+ryleu@users.noreply.github.com> Date: Tue, 27 May 2025 00:39:52 -0500 Subject: [PATCH] no idea if this works, thanks gemini --- src/core_pkg/core_pkg/core_ptz.py | 296 +++++++++------- src/core_pkg/core_pkg/siyi_sdk/siyi.py | 452 ++++++++++++++++--------- src/ros2_interfaces_pkg | 2 +- 3 files changed, 468 insertions(+), 282 deletions(-) diff --git a/src/core_pkg/core_pkg/core_ptz.py b/src/core_pkg/core_pkg/core_ptz.py index 47dd2c5..15df4ec 100644 --- a/src/core_pkg/core_pkg/core_ptz.py +++ b/src/core_pkg/core_pkg/core_ptz.py @@ -38,15 +38,13 @@ class PtzNode(Node): # Create a camera instance self.camera = None - self.camera_connected = False + self.camera_connected = False # This flag is still managed but not used to gate commands self.loop = None - self.thread_pool = None # Renamed from self.executor to avoid conflict + self.thread_pool = None # Create publishers self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) - #create timer to publish feedback at a regular interval - #self.create_timer(1.0, self.publish_feedback) # Create subscribers self.control_sub = self.create_subscription( @@ -59,18 +57,18 @@ class PtzNode(Node): # Create feedback message self.feedback_msg = PtzFeedback() - self.feedback_msg.connected = False + self.feedback_msg.connected = False # This will reflect the actual connection state self.feedback_msg.error_msg = "Initializing" # Flags for async operations self.shutdown_requested = False # Set up asyncio event loop in a separate thread - self.thread_pool = ThreadPoolExecutor(max_workers=1) # Renamed from self.executor + self.thread_pool = ThreadPoolExecutor(max_workers=1) self.loop = asyncio.new_event_loop() # Connect to camera on startup - self.connect_task = self.thread_pool.submit( # Use thread_pool instead of executor + self.connect_task = self.thread_pool.submit( self.run_async_func, self.connect_to_camera() ) @@ -97,47 +95,53 @@ class PtzNode(Node): self.feedback_msg.connected = True self.feedback_msg.error_msg = "" - #self.get_logger().info("Connected to PTZ camera") self.publish_debug("Camera connected successfully") except Exception as e: - #self.camera_connected = False + self.camera_connected = False self.feedback_msg.connected = False self.feedback_msg.error_msg = f"Connection error: {str(e)}" - #self.get_logger().error(f"Failed to connect to camera: {e}") self.publish_debug(f"Camera connection failed: {str(e)}") def camera_data_callback(self, cmd_id, data): """Handle data received from the camera.""" - if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): - self.last_data_time = time.time() # Update timestamp on successful data - - # Update feedback message with attitude data - self.feedback_msg.yaw = data.yaw - self.feedback_msg.pitch = data.pitch - self.feedback_msg.roll = data.roll - self.feedback_msg.yaw_velocity = data.yaw_velocity - self.feedback_msg.pitch_velocity = data.pitch_velocity - self.feedback_msg.roll_velocity = data.roll_velocity - - # Publish feedback - self.feedback_pub.publish(self.feedback_msg) - else: - # Log other data for debugging - debug_str = f"Camera data: CMD_ID={cmd_id}, Data={data}" - self.get_logger().debug(debug_str) + # Update last_data_time regardless of self.camera_connected, + # as data might arrive during a brief reconnect window. + self.last_data_time = time.time() + if self.camera_connected: # Only process for feedback if we believe we are connected + if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): + self.feedback_msg.yaw = data.yaw + self.feedback_msg.pitch = data.pitch + self.feedback_msg.roll = data.roll + self.feedback_msg.yaw_velocity = data.yaw_velocity + self.feedback_msg.pitch_velocity = data.pitch_velocity + self.feedback_msg.roll_velocity = data.roll_velocity + self.feedback_pub.publish(self.feedback_msg) + else: + debug_str = "" + if isinstance(cmd_id, CommandID): + debug_str = f"Camera data: CMD_ID={cmd_id.name}, Data=" + else: + debug_str = f"Camera data: CMD_ID={cmd_id}, Data=" + + if isinstance(data, bytes): + debug_str += data.hex() + else: + debug_str += str(data) + self.get_logger().debug(debug_str) + def check_camera_connection(self): """Periodically check camera connection and attempt to reconnect if needed.""" if not self.camera_connected and not self.shutdown_requested: - #self.get_logger().info("Attempting to reconnect to camera...") + self.publish_debug("Attempting to reconnect to camera...") if self.camera: - # Fully clean up old connection first try: - self.run_async_func(self.camera.disconnect()) - except Exception: - pass # Ignore errors during cleanup - self.camera = None + if self.camera.is_connected: # SDK's internal connection state + self.run_async_func(self.camera.disconnect()) + except Exception as e: + self.get_logger().debug(f"Error during pre-reconnect disconnect: {e}") + # self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new self.connect_task = self.thread_pool.submit( self.run_async_func, self.connect_to_camera() @@ -145,157 +149,207 @@ class PtzNode(Node): def check_camera_health(self): """Check if we're still receiving data from the camera""" - if self.camera_connected: + if self.camera_connected: # Only check health if we think we are connected time_since_last_data = time.time() - self.last_data_time - if time_since_last_data > 5.0: # No data for 5 seconds - #self.get_logger().warning(f"No camera data for {time_since_last_data:.1f}s, connection may be stale") + if time_since_last_data > 5.0: + self.publish_debug(f"No camera data for {time_since_last_data:.1f}s, marking as disconnected.") self.camera_connected = False - # Force reconnection on next check_camera_connection timer + self.feedback_msg.connected = False + self.feedback_msg.error_msg = "Connection stale (no data)" + self.feedback_pub.publish(self.feedback_msg) def handle_control_command(self, msg): """Handle incoming control commands.""" - if not self.camera_connected: - #self.get_logger().warning("Camera not connected, ignoring control command") + # Removed: if not self.camera_connected + if not self.camera: # Still check if camera object exists + self.get_logger().warning("Camera object not initialized, ignoring control command") return - # Submit the control command to the async executor - self.thread_pool.submit( # Changed from self.executor + self.thread_pool.submit( self.run_async_func, self.process_control_command(msg) ) async def process_control_command(self, msg): """Process and send the control command to the camera.""" + if not self.camera: + self.get_logger().error("Process control command called but camera object is None.") + return try: - # Check if reset command + # The SDK's send_... methods will raise RuntimeError if not connected. + # This try-except block will catch those. if msg.reset: - self.get_logger().info("Resetting camera to center position") + self.get_logger().info("Attempting to reset camera to center position") await self.camera.send_attitude_angles_command(0.0, 0.0) return - # Process based on control mode - if msg.control_mode == 0: # Relative speed control - # Clamp values between -100 and 100 - turn_yaw = max(-100, min(100, msg.turn_yaw)) - turn_pitch = max(-100, min(100, msg.turn_pitch)) - - self.get_logger().debug(f"Sending rotation command: yaw={turn_yaw}, pitch={turn_pitch}") + if msg.control_mode == 0: + turn_yaw = max(-100, min(100, int(msg.turn_yaw))) + turn_pitch = max(-100, min(100, int(msg.turn_pitch))) + self.get_logger().debug(f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}") await self.camera.send_rotation_command(turn_yaw, turn_pitch) - elif msg.control_mode == 1: # Absolute angle control - # Clamp angles to valid ranges + elif msg.control_mode == 1: yaw = max(-135.0, min(135.0, msg.yaw)) pitch = max(-90.0, min(90.0, msg.pitch)) - - self.get_logger().debug(f"Sending absolute angles: yaw={yaw}, pitch={pitch}") + self.get_logger().debug(f"Attempting absolute angles: yaw={yaw}, pitch={pitch}") await self.camera.send_attitude_angles_command(yaw, pitch) - elif msg.control_mode == 2: # Single axis control + elif msg.control_mode == 2: axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH - self.get_logger().debug(f"Sending single axis command: axis={axis.name}, angle={msg.angle}") - await self.camera.send_single_axis_attitude_command(msg.angle, axis) + angle = msg.angle + self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}") + await self.camera.send_single_axis_attitude_command(angle, axis) + + elif msg.control_mode == 3: + zoom_level = msg.zoom_level + self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x") + await self.camera.send_absolute_zoom_command(zoom_level) - # Handle data streaming configuration if requested - if msg.stream_type > 0 and msg.stream_freq >= 0: - try: - stream_type = DataStreamType(msg.stream_type) - stream_freq = DataStreamFrequency(msg.stream_freq) - - self.get_logger().info( - f"Setting data stream: type={stream_type.name}, freq={stream_freq.name}" - ) - - await self.camera.send_data_stream_request(stream_type, stream_freq) - - except ValueError: - self.get_logger().error("Invalid stream type or frequency values") - + if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'): + if msg.stream_type > 0 and msg.stream_freq >= 0: + try: + stream_type = DataStreamType(msg.stream_type) + stream_freq = DataStreamFrequency(msg.stream_freq) + self.get_logger().info( + f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}" + ) + await self.camera.send_data_stream_request(stream_type, stream_freq) + except ValueError: + self.get_logger().error("Invalid stream type or frequency values in control message") + + except RuntimeError as e: # Catch SDK's "not connected" errors + self.get_logger().warning(f"SDK command failed (likely not connected): {e}") + # self.camera_connected will be updated by health/connection checks + # self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check + # self.feedback_pub.publish(self.feedback_msg) except Exception as e: self.get_logger().error(f"Error processing control command: {e}") self.feedback_msg.error_msg = f"Control error: {str(e)}" - self.feedback_pub.publish(self.feedback_msg) + self.feedback_pub.publish(self.feedback_msg) # Publish for other errors - def publish_debug(self, message): + def publish_debug(self, message_text): """Publish debug message.""" msg = String() - msg.data = message + msg.data = f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" self.debug_pub.publish(msg) + self.get_logger().info(message_text) def run_async_func(self, coro): """Run an async function in the event loop.""" - return asyncio.run_coroutine_threadsafe(coro, self.loop).result() - - async def shutdown(self): - """Perform clean shutdown.""" - self.shutdown_requested = True - - if self.camera and self.camera_connected: + if self.loop and self.loop.is_running(): try: - # Stop any data streams + return asyncio.run_coroutine_threadsafe(coro, self.loop).result(timeout=5.0) # Added timeout + except asyncio.TimeoutError: + self.get_logger().warning(f"Async function {coro.__name__} timed out.") + return None + except Exception as e: + self.get_logger().error(f"Exception in run_async_func for {coro.__name__}: {e}") + return None + else: + self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.") + return None + + + async def shutdown_node_async(self): + """Perform clean shutdown of camera connection.""" + self.shutdown_requested = True + self.get_logger().info("Async shutdown initiated...") + if self.camera and self.camera.is_connected: # Check SDK's connection state + try: + self.get_logger().info("Disabling data stream...") await self.camera.send_data_stream_request( - DataStreamType.ATTITUDE_DATA, + DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE ) + await asyncio.sleep(0.1) - # Disconnect from camera + self.get_logger().info("Disconnecting from camera...") await self.camera.disconnect() - self.get_logger().info("Disconnected from camera") + self.get_logger().info("Disconnected from camera successfully.") except Exception as e: - self.get_logger().error(f"Error during shutdown: {e}") + self.get_logger().error(f"Error during camera shutdown: {e}") + self.camera_connected = False # Update node's flag + self.feedback_msg.connected = False + self.feedback_msg.error_msg = "Shutting down" def cleanup(self): """Clean up resources.""" - if self.loop and self.thread_pool: # Changed from self.executor - # Schedule the shutdown coroutine - try: - self.run_async_func(self.shutdown()) - except Exception as e: - self.get_logger().error(f"Error during cleanup: {e}") + self.get_logger().info("PTZ node cleanup initiated.") + self.shutdown_requested = True + + if self.connection_timer: + self.connection_timer.cancel() + if self.health_check_timer: + self.health_check_timer.cancel() + + if self.loop and self.thread_pool: + if self.loop.is_running(): + try: + future = asyncio.run_coroutine_threadsafe(self.shutdown_node_async(), self.loop) + future.result(timeout=5) + except Exception as e: + self.get_logger().error(f"Error during async shutdown in cleanup: {e}") - # Shut down the executor - self.thread_pool.shutdown() # Changed from self.executor + self.get_logger().info("Shutting down thread pool executor...") + self.thread_pool.shutdown(wait=True) - # Close the event loop - self.loop.stop() - self.loop.close() + if self.loop.is_running(): + self.get_logger().info("Stopping asyncio event loop...") + self.loop.call_soon_threadsafe(self.loop.stop) - self.get_logger().info("PTZ node resources cleaned up") + self.get_logger().info("PTZ node resources cleaned up.") + else: + self.get_logger().warning("Loop or thread_pool not initialized, skipping parts of cleanup.") + def main(args=None): """Main function.""" - # Initialize ROS rclpy.init(args=args) - # Create the node ptz_node = PtzNode() - # Create and start event loop for the camera - def run_event_loop(loop): - asyncio.set_event_loop(loop) - loop.run_forever() - - # Start the asyncio loop in a separate thread - asyncio_thread = threading.Thread( - target=run_event_loop, - args=(ptz_node.loop,), - daemon=True - ) - asyncio_thread.start() + asyncio_thread = None + if ptz_node.loop: + def run_event_loop(loop): + asyncio.set_event_loop(loop) + try: + loop.run_forever() + finally: + # This ensures the loop is closed when the thread running it exits. + # This is important if run_forever() exits due to loop.stop() + # or an unhandled exception within a task scheduled on the loop. + if not loop.is_closed(): + loop.close() + + asyncio_thread = threading.Thread( + target=run_event_loop, + args=(ptz_node.loop,), + daemon=True + ) + asyncio_thread.start() try: - # Spin the node to process callbacks rclpy.spin(ptz_node) except KeyboardInterrupt: - pass + ptz_node.get_logger().info("KeyboardInterrupt received, shutting down...") + except SystemExit: + ptz_node.get_logger().info("SystemExit received, shutting down...") finally: - # Clean up - ptz_node.cleanup() + ptz_node.get_logger().info("Initiating final cleanup...") + ptz_node.cleanup() # This will stop the loop and shutdown the executor + + if asyncio_thread and asyncio_thread.is_alive(): + # The loop should have been stopped by cleanup. We just join the thread. + ptz_node.get_logger().info("Waiting for asyncio thread to join...") + asyncio_thread.join(timeout=5) + if asyncio_thread.is_alive(): + ptz_node.get_logger().warning("Asyncio thread did not join cleanly.") + rclpy.shutdown() + ptz_node.get_logger().info("ROS shutdown complete.") + if __name__ == '__main__': - # Register signal handler for clean shutdown - signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0)) - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) main() - diff --git a/src/core_pkg/core_pkg/siyi_sdk/siyi.py b/src/core_pkg/core_pkg/siyi_sdk/siyi.py index 954da86..a5e426a 100644 --- a/src/core_pkg/core_pkg/siyi_sdk/siyi.py +++ b/src/core_pkg/core_pkg/siyi_sdk/siyi.py @@ -9,11 +9,6 @@ from crccheck.crc import Crc16 # Set up logging logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) # You can adjust the logging level as needed -handler = logging.StreamHandler() # Output to console -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -logger.addHandler(handler) # Global constant for the heartbeat packet (used in TCP keep-alive) HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B") @@ -69,10 +64,11 @@ class CommandID(Enum): """ ROTATION_CONTROL = 0x07 + ABSOLUTE_ZOOM = 0x08 ATTITUDE_ANGLES = 0x0E SINGLE_AXIS_CONTROL = 0x41 DATA_STREAM_REQUEST = 0x25 - ATTITUDE_DATA_RESPONSE = 0x0D # Expected response packet for attitude data + ATTITUDE_DATA_RESPONSE = 0x0D @dataclass @@ -96,7 +92,6 @@ class AttitudeData: """Create an AttitudeData instance from a byte string.""" if len(data) != 12: raise ValueError("Attitude data should be exactly 12 bytes.") - # Unpack six little-endian int16 values values = struct.unpack(" None: - """Establish a TCP connection with the gimbal camera.""" try: self.reader, self.writer = await asyncio.open_connection( self.ip, self.port ) self.is_connected = True - #logger.info(f"Connected to {self.ip}:{self.port}") asyncio.create_task(self.heartbeat_loop()) - # Start the data stream listener in the background. asyncio.create_task(self._data_stream_listener()) except (socket.gaierror, ConnectionRefusedError) as e: self.is_connected = False @@ -163,104 +136,113 @@ class SiyiGimbalCamera: raise async def disconnect(self) -> None: - """Gracefully disconnect from the gimbal camera.""" - if self.is_connected: + if self.is_connected and self.writer: self.is_connected = False self.writer.close() await self.writer.wait_closed() - #logger.info("Disconnected") else: - #logger.warning("Not connected, cannot disconnect.") pass async def heartbeat_loop(self) -> None: - """Periodically send heartbeat packets to maintain the TCP connection.""" - if not self.is_connected: - #logger.warning("Heartbeat loop started before connection was established.") + if not self.is_connected or not self.writer: return - try: while self.is_connected: try: self.writer.write(HEARTBEAT_PACKET) await self.writer.drain() - #logger.debug("Sent heartbeat packet") await asyncio.sleep(self.heartbeat_interval) except (socket.error, BrokenPipeError) as e: - #logger.error(f"Connection error in heartbeat loop: {e}") break finally: - #logger.info("Heartbeat loop stopped.") if self.is_connected: await self.disconnect() - def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: - """Build a full packet for the 0x07 Gimbal Rotation Control command.""" + def _build_packet_header( + self, cmd_id: CommandID, data_len: int + ) -> bytearray: + """Helper to build the common packet header.""" packet = bytearray() - packet.extend(b"\x55\x66") # STX bytes - packet.append(0x01) # CTRL (request ACK) - packet.extend(struct.pack(" bytes: + """Helper to add CRC and increment sequence number.""" crc_value = Crc16.calc(packet) packet.extend(struct.pack(" None: - """Send a rotation control command (0x07) with the given yaw and pitch values.""" - if not self.is_connected: - raise RuntimeError("Socket is not connected, cannot send rotation command.") + def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: + data_len = 2 + packet = self._build_packet_header( + CommandID.ROTATION_CONTROL, data_len + ) + packet.extend(struct.pack("bb", turn_yaw, turn_pitch)) + return self._finalize_packet(packet) + + async def send_rotation_command( + self, turn_yaw: int, turn_pitch: int + ) -> None: + if not self.is_connected or not self.writer: + raise RuntimeError( + "Socket is not connected or writer is None, cannot send rotation command." + ) packet = self._build_rotation_packet(turn_yaw, turn_pitch) self.writer.write(packet) await self.writer.drain() - logger.debug(f"Sent rotation command with yaw {turn_yaw} and pitch {turn_pitch}") + logger.debug( + f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}" + ) - def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes: - """Build a full packet for the 0x0E 'Set Gimbal Attitude Angles' command.""" - packet = bytearray() - packet.extend(b"\x55\x66") - packet.append(0x01) # CTRL (request ACK) - packet.extend(struct.pack(" bytes: + data_len = 4 + packet = self._build_packet_header( + CommandID.ATTITUDE_ANGLES, data_len + ) yaw_int = int(round(yaw * 10)) pitch_int = int(round(pitch * 10)) packet.extend(struct.pack(" None: - """Send the 0x0E 'Set Gimbal Attitude Angles' command to set absolute angles.""" - if not self.is_connected: - raise RuntimeError("Socket is not connected, cannot send attitude angles command.") + async def send_attitude_angles_command( + self, yaw: float, pitch: float + ) -> None: + if not self.is_connected or not self.writer: + raise RuntimeError( + "Socket is not connected or writer is None, cannot send attitude angles command." + ) packet = self._build_attitude_angles_packet(yaw, pitch) self.writer.write(packet) await self.writer.drain() - logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°") + logger.debug( + f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°" + ) - def _build_single_axis_attitude_packet(self, angle: float, axis: SingleAxis) -> bytes: - """Build a full packet for the 0x41 'Single-Axis Attitude Control' command.""" - packet = bytearray() - packet.extend(b"\x55\x66") - packet.append(0x01) # CTRL - packet.extend(struct.pack(" bytes: + data_len = 3 + packet = self._build_packet_header( + CommandID.SINGLE_AXIS_CONTROL, data_len + ) angle_int = int(round(angle * 10)) packet.extend(struct.pack(" None: - """Send the 0x41 single-axis attitude control command.""" - if not self.is_connected: - raise RuntimeError("Socket is not connected, cannot send single-axis attitude command.") + async def send_single_axis_attitude_command( + self, angle: float, axis: SingleAxis + ) -> None: + if not self.is_connected or not self.writer: + raise RuntimeError( + "Socket is not connected or writer is None, cannot send single-axis attitude command." + ) packet = self._build_single_axis_attitude_packet(angle, axis) self.writer.write(packet) await self.writer.drain() @@ -268,103 +250,253 @@ class SiyiGimbalCamera: f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°" ) - def _build_data_stream_packet(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> bytes: - """Build a packet for the 0x25 'Request Gimbal to Send Data Stream' command.""" - packet = bytearray() - packet.extend(b"\x55\x66") - packet.append(0x01) # CTRL, request ACK - packet.extend(struct.pack(" bytes: + data_len = 2 + packet = self._build_packet_header( + CommandID.DATA_STREAM_REQUEST, data_len + ) + packet.append(data_type.value) + packet.append(data_freq.value) + return self._finalize_packet(packet) - async def send_data_stream_request(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> None: - """Send the 0x25 command to request a continuous data stream from the gimbal.""" - if not self.is_connected: - raise RuntimeError("Socket is not connected, cannot request data stream.") + async def send_data_stream_request( + self, data_type: DataStreamType, data_freq: DataStreamFrequency + ) -> None: + if not self.is_connected or not self.writer: + raise RuntimeError( + "Socket is not connected or writer is None, cannot request data stream." + ) packet = self._build_data_stream_packet(data_type, data_freq) self.writer.write(packet) await self.writer.drain() - #logger.info(f"Sent data stream request: data_type {data_type}, data_freq {data_freq}") + logger.info( + f"Sent data stream request: data_type {data_type.name}, data_freq {data_freq.name}" + ) + + def _build_absolute_zoom_packet(self, zoom_level: float) -> bytes: + data_len = 2 + packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len) + zoom_packet_value = int(round(zoom_level * 10)) + if not (0 <= zoom_packet_value <= 65535): # Should be caught by clamping earlier + raise ValueError( + "Zoom packet value out of uint16_t range after conversion." + ) + packet.extend(struct.pack(" None: + """ + Send an absolute zoom command (0x08) to the gimbal. + + :param zoom_level: The desired zoom level as a float (e.g., 1.0 for 1x, + 2.5 for 2.5x). For A8 mini, this will be clamped + between 1.0 and MAX_A8_MINI_ZOOM (6.0). + """ + if not self.is_connected or not self.writer: + raise RuntimeError( + "Socket is not connected or writer is None, cannot send absolute zoom command." + ) + + original_requested_zoom = zoom_level # For logging clarity + + if zoom_level < 1.0: + logger.warning( + f"Requested zoom level {original_requested_zoom:.1f} is less than 1.0. Clamping to 1.0x." + ) + zoom_level = 1.0 + elif zoom_level > self.MAX_A8_MINI_ZOOM: # Check against max zoom + logger.warning( + f"Requested zoom level {original_requested_zoom:.1f} exceeds maximum {self.MAX_A8_MINI_ZOOM:.1f}x for A8 mini. " + f"Clamping to {self.MAX_A8_MINI_ZOOM:.1f}x." + ) + zoom_level = self.MAX_A8_MINI_ZOOM + + packet = self._build_absolute_zoom_packet(zoom_level) + self.writer.write(packet) + await self.writer.drain() + logger.debug( + f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)" + ) async def _read_packet(self): - """Read a full packet from the TCP stream following the protocol format.""" + if not self.reader: + raise RuntimeError("Reader is not initialized.") stx = await self.reader.readexactly(2) if stx != b"\x55\x66": - raise ValueError("Invalid packet start bytes.") + raise ValueError(f"Invalid packet start bytes: {stx.hex()}") ctrl = await self.reader.readexactly(1) data_len_bytes = await self.reader.readexactly(2) data_len = struct.unpack(" 2048: # Arbitrary reasonable limit + raise ValueError(f"Excessive data length received: {data_len}") + data = await self.reader.readexactly(data_len) crc_bytes = await self.reader.readexactly(2) received_crc = struct.unpack(" AttitudeData: - """Parse the attitude data (12 bytes) into human-readable values.""" if len(data) != 12: raise ValueError("Attitude data should be exactly 12 bytes.") return AttitudeData.from_bytes(data) async def _data_stream_listener(self) -> None: - """ - Continuously listen for incoming packets from the gimbal and process them. - - If a data callback is set via set_data_callback(), the callback is called - with the CommandID and the corresponding data (parsed or raw). - """ while self.is_connected: try: cmd_id_int, data = await self._read_packet() - cmd_id = CommandID(cmd_id_int) - # Process data as before... - except (ConnectionResetError, BrokenPipeError, ConnectionError) as e: - logger.error(f"Connection error: {e}") + try: + cmd_id_enum = CommandID(cmd_id_int) + except ValueError: + logger.warning( + f"Received unknown CMD_ID {cmd_id_int:02X}, data: {data.hex()}" + ) + if self._data_callback: + self._data_callback(cmd_id_int, data) + continue + + if ( + cmd_id_enum == CommandID.ATTITUDE_DATA_RESPONSE + and len(data) == 12 + ): + try: + parsed = AttitudeData.from_bytes(data) + if self._data_callback: + self._data_callback(cmd_id_enum, parsed) + else: + logger.info(f"Received attitude data: {parsed}") + except Exception as e: + logger.exception( + f"Failed to parse attitude data: {e}" + ) + if self._data_callback: + self._data_callback(cmd_id_enum, data) + else: + if self._data_callback: + self._data_callback(cmd_id_enum, data) + else: + logger.info( + f"Received packet with CMD_ID {cmd_id_enum.name} ({cmd_id_enum.value:02X}): {data.hex()}" + ) + + except ( + ConnectionResetError, + BrokenPipeError, + ConnectionError, + asyncio.IncompleteReadError, + ) as e: + logger.error(f"Connection error in listener: {e}") self.is_connected = False - break # Exit the loop to allow reconnection + break + except ValueError as e: + logger.error(f"Packet error in listener: {e}") + # Consider adding a small delay or a mechanism to resync if this happens frequently + await asyncio.sleep(0.1) # Small delay before trying to read again + continue except Exception as e: - #logger.error(f"Error reading packet: {e}") - ## Continue trying for other errors, but count failures + logger.exception(f"Unexpected error in data stream listener: {e}") + # Depending on the error, you might want to break or continue + await asyncio.sleep(0.1) # Small delay continue - # Process attitude data packets if applicable. - if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and len(data) == 12: - try: - parsed = AttitudeData.from_bytes(data) - except Exception as e: - logger.exception(f"Failed to parse attitude data: {e}") - parsed = None - if self._data_callback: - self._data_callback(cmd_id, parsed) - else: - logger.info(f"Received attitude data: {parsed}") - else: - if self._data_callback: - self._data_callback(cmd_id, data) - else: - logger.info(f"Received packet with CMD_ID {cmd_id}: {data}") - - def set_data_callback(self, callback: Callable[[CommandID, Any], None]) -> None: - """ - Change the callback function used by the data stream listener. - - The callback should be a function that accepts two arguments: - - A CommandID indicating the type of packet received. - - The parsed data (for attitude data, an AttitudeData instance) or raw data. - - :param callback: A callable to be used as the data callback. - """ + def set_data_callback( + self, callback: Callable[[CommandID | int, Any], None] + ) -> None: self._data_callback = callback + + +async def main_sdk_test(): # Renamed to avoid conflict if this file is imported + gimbal_ip = "192.168.144.25" + gimbal = SiyiGimbalCamera(gimbal_ip) + + def my_data_handler(cmd_id, data): + if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): + print( + f"Attitude: Yaw={data.yaw:.1f}, Pitch={data.pitch:.1f}, Roll={data.roll:.1f}" + ) + elif isinstance(cmd_id, CommandID): + print( + f"Received {cmd_id.name}: {data.hex() if isinstance(data, bytes) else data}" + ) + else: + print( + f"Received Unknown CMD_ID {cmd_id:02X}: {data.hex() if isinstance(data, bytes) else data}" + ) + + gimbal.set_data_callback(my_data_handler) + + try: + await gimbal.connect() + if gimbal.is_connected: + print("SDK Test: Successfully connected to the gimbal.") + + print("SDK Test: Setting zoom to 1.0x") + await gimbal.send_absolute_zoom_command(1.0) + await asyncio.sleep(2) + + print("SDK Test: Setting zoom to 3.5x") + await gimbal.send_absolute_zoom_command(3.5) + await asyncio.sleep(2) + + print("SDK Test: Setting zoom to 6.0x (A8 mini max)") + await gimbal.send_absolute_zoom_command(6.0) + await asyncio.sleep(2) + + print("SDK Test: Attempting zoom to 7.0x (should be clamped to 6.0x)") + await gimbal.send_absolute_zoom_command(7.0) + await asyncio.sleep(2) + + print("SDK Test: Attempting zoom to 0.5x (should be clamped to 1.0x)") + await gimbal.send_absolute_zoom_command(0.5) + await asyncio.sleep(2) + + print("SDK Test: Requesting attitude data stream at 5Hz...") + await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_5) + + print("SDK Test: Listening for data for 10 seconds...") + await asyncio.sleep(10) + + print("SDK Test: Disabling attitude data stream...") + await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE) + await asyncio.sleep(1) + + + except ConnectionRefusedError: + print( + f"SDK Test: Connection to {gimbal_ip} was refused. Is the gimbal on and accessible?" + ) + except Exception as e: + print(f"SDK Test: An error occurred: {e}") + finally: + if gimbal.is_connected: + print("SDK Test: Disconnecting...") + await gimbal.disconnect() + print("SDK Test: Program finished.") + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" + ) + # To see debug logs from this SDK specifically: + # logger.setLevel(logging.DEBUG) + asyncio.run(main_sdk_test()) diff --git a/src/ros2_interfaces_pkg b/src/ros2_interfaces_pkg index 3965e0f..a412af5 160000 --- a/src/ros2_interfaces_pkg +++ b/src/ros2_interfaces_pkg @@ -1 +1 @@ -Subproject commit 3965e0f650fc8fe5a59e16073996f084d01bd554 +Subproject commit a412af5017e73c4090bc5ebe709546561afcbebe