mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
attempting manual zoom functionality
This commit is contained in:
@@ -202,9 +202,14 @@ class PtzNode(Node):
|
|||||||
await self.camera.send_single_axis_attitude_command(angle, axis)
|
await self.camera.send_single_axis_attitude_command(angle, axis)
|
||||||
|
|
||||||
elif msg.control_mode == 3:
|
elif msg.control_mode == 3:
|
||||||
zoom_level = msg.zoom_level
|
zoom_direction = 0
|
||||||
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x")
|
if msg.zoom_level > 0:
|
||||||
await self.camera.send_absolute_zoom_command(zoom_level)
|
zoom_direction = 1 # Zoom in
|
||||||
|
elif msg.zoom_level < 0:
|
||||||
|
zoom_direction = -1 # Zoom out
|
||||||
|
|
||||||
|
self.get_logger().debug(f"Attempting manual zoom: direction={zoom_direction}")
|
||||||
|
await self.camera.send_manual_zoom_command(zoom_direction)
|
||||||
|
|
||||||
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
|
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
|
||||||
if msg.stream_type > 0 and msg.stream_freq >= 0:
|
if msg.stream_type > 0 and msg.stream_freq >= 0:
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ class CommandID(Enum):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
ROTATION_CONTROL = 0x07
|
ROTATION_CONTROL = 0x07
|
||||||
|
MANUAL_ZOOM = 0x05 # Added manual zoom command
|
||||||
ABSOLUTE_ZOOM = 0x08
|
ABSOLUTE_ZOOM = 0x08
|
||||||
ATTITUDE_ANGLES = 0x0E
|
ATTITUDE_ANGLES = 0x0E
|
||||||
SINGLE_AXIS_CONTROL = 0x41
|
SINGLE_AXIS_CONTROL = 0x41
|
||||||
@@ -320,6 +321,35 @@ class SiyiGimbalCamera:
|
|||||||
f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)"
|
f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _build_manual_zoom_packet(self, zoom_direction: int) -> bytes:
|
||||||
|
"""
|
||||||
|
Helper to build a manual zoom command packet.
|
||||||
|
|
||||||
|
:param zoom_direction: Direction to zoom: 1 for in, -1 for out, 0 for stop.
|
||||||
|
:return: Complete packet bytes ready to send.
|
||||||
|
"""
|
||||||
|
data_len = 1
|
||||||
|
packet = self._build_packet_header(CommandID.MANUAL_ZOOM, data_len)
|
||||||
|
# Ensure the direction is within valid range (-1, 0, 1)
|
||||||
|
direction_value = max(-1, min(1, zoom_direction))
|
||||||
|
packet.append(direction_value)
|
||||||
|
return self._finalize_packet(packet)
|
||||||
|
|
||||||
|
async def send_manual_zoom_command(self, zoom_direction: int) -> None:
|
||||||
|
"""
|
||||||
|
Send a manual zoom command (0x05) to the gimbal.
|
||||||
|
|
||||||
|
:param zoom_direction: 1 to zoom in, -1 to zoom out, 0 to stop zooming.
|
||||||
|
"""
|
||||||
|
if not self.is_connected or not self.writer:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Socket is not connected or writer is None, cannot send manual zoom command."
|
||||||
|
)
|
||||||
|
packet = self._build_manual_zoom_packet(zoom_direction)
|
||||||
|
self.writer.write(packet)
|
||||||
|
await self.writer.drain()
|
||||||
|
logger.debug(f"Sent manual zoom command with direction {zoom_direction}")
|
||||||
|
|
||||||
async def _read_packet(self):
|
async def _read_packet(self):
|
||||||
if not self.reader:
|
if not self.reader:
|
||||||
raise RuntimeError("Reader is not initialized.")
|
raise RuntimeError("Reader is not initialized.")
|
||||||
|
|||||||
Reference in New Issue
Block a user