From 2294743cc0dc9c64d4ec281a8015d36cd14802d6 Mon Sep 17 00:00:00 2001 From: Tristan McGinnis Date: Tue, 27 May 2025 19:05:54 -0600 Subject: [PATCH] zoom testing --- src/core_pkg/core_pkg/core_ptz.py | 75 +++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/src/core_pkg/core_pkg/core_ptz.py b/src/core_pkg/core_pkg/core_ptz.py index 15df4ec..79ea552 100644 --- a/src/core_pkg/core_pkg/core_ptz.py +++ b/src/core_pkg/core_pkg/core_ptz.py @@ -42,6 +42,13 @@ class PtzNode(Node): self.loop = None self.thread_pool = None + # Track current zoom level and zoom state + self.current_zoom_level = 1.0 + self.max_zoom_level = 6.0 # A8 mini max zoom + self.zoom_step = 0.2 # Zoom step per command + self.zooming = 0 # Current zoom direction: -1=out, 0=stop, 1=in + self.zoom_timer = None + # Create publishers self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) @@ -130,7 +137,6 @@ class PtzNode(Node): debug_str += str(data) self.get_logger().debug(debug_str) - def check_camera_connection(self): """Periodically check camera connection and attempt to reconnect if needed.""" if not self.camera_connected and not self.shutdown_requested: @@ -181,6 +187,10 @@ class PtzNode(Node): if msg.reset: self.get_logger().info("Attempting to reset camera to center position") await self.camera.send_attitude_angles_command(0.0, 0.0) + + # Also reset zoom to 1x when resetting camera + self.current_zoom_level = 1.0 + await self.camera.send_absolute_zoom_command(1.0) return if msg.control_mode == 0: @@ -201,10 +211,21 @@ class PtzNode(Node): self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}") await self.camera.send_single_axis_attitude_command(angle, axis) - elif msg.control_mode == 3: - zoom_level = msg.zoom_level - self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x") - await self.camera.send_absolute_zoom_command(zoom_level) + elif msg.control_mode == 3: + # Instead of absolute zoom, interpret zoom_level as zoom direction + zoom_direction = int(msg.zoom_level) if abs(msg.zoom_level) >= 0.5 else 0 + zoom_direction = max(-1, min(1, zoom_direction)) # Restrict to -1, 0, 1 + + if zoom_direction != self.zooming: + self.zooming = zoom_direction + self.get_logger().debug(f"Zoom direction changed to {zoom_direction}") + + if zoom_direction == 0: + # Stop zooming + self.get_logger().debug(f"Stopping zoom at level {self.current_zoom_level:.1f}x") + else: + # Start zooming in the specified direction + await self.perform_zoom(zoom_direction) if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'): if msg.stream_type > 0 and msg.stream_freq >= 0: @@ -221,13 +242,52 @@ class PtzNode(Node): except RuntimeError as e: # Catch SDK's "not connected" errors self.get_logger().warning(f"SDK command failed (likely not connected): {e}") # self.camera_connected will be updated by health/connection checks - # self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check - # self.feedback_pub.publish(self.feedback_msg) except Exception as e: self.get_logger().error(f"Error processing control command: {e}") self.feedback_msg.error_msg = f"Control error: {str(e)}" self.feedback_pub.publish(self.feedback_msg) # Publish for other errors + async def perform_zoom(self, direction): + """Perform a zoom operation in the specified direction.""" + if not self.camera or not self.camera.is_connected: + return + + if direction == 0: + return + + # Calculate new zoom level + new_zoom_level = self.current_zoom_level + (direction * self.zoom_step) + + # Clamp to valid range + new_zoom_level = max(1.0, min(self.max_zoom_level, new_zoom_level)) + + # Skip if no change (already at min/max) + if new_zoom_level == self.current_zoom_level: + if direction > 0: + self.get_logger().debug(f"Already at maximum zoom level ({self.max_zoom_level:.1f}x)") + else: + self.get_logger().debug(f"Already at minimum zoom level (1.0x)") + return + + # Set the new zoom level + self.get_logger().debug(f"Zooming from {self.current_zoom_level:.1f}x to {new_zoom_level:.1f}x") + self.current_zoom_level = new_zoom_level + + try: + await self.camera.send_absolute_zoom_command(new_zoom_level) + + # If still zooming in same direction, schedule another zoom step + if self.zooming == direction: + # Schedule a new zoom step after a short delay if we're not at the limits + if (direction > 0 and new_zoom_level < self.max_zoom_level) or \ + (direction < 0 and new_zoom_level > 1.0): + # Use create_timer for a non-blocking delay + await asyncio.sleep(0.2) # 200ms delay between zoom steps + if self.zooming == direction: # Check if direction hasn't changed + await self.perform_zoom(direction) + except Exception as e: + self.get_logger().error(f"Error during zoom operation: {e}") + def publish_debug(self, message_text): """Publish debug message.""" msg = String() @@ -250,7 +310,6 @@ class PtzNode(Node): self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.") return None - async def shutdown_node_async(self): """Perform clean shutdown of camera connection.""" self.shutdown_requested = True