no idea if this works, thanks gemini

This commit is contained in:
ryleu
2025-05-27 00:39:52 -05:00
parent 9d10607b2e
commit a4d3f0dfbe
3 changed files with 468 additions and 282 deletions

View File

@@ -38,15 +38,13 @@ class PtzNode(Node):
# Create a camera instance # Create a camera instance
self.camera = None self.camera = None
self.camera_connected = False self.camera_connected = False # This flag is still managed but not used to gate commands
self.loop = None self.loop = None
self.thread_pool = None # Renamed from self.executor to avoid conflict self.thread_pool = None
# Create publishers # Create publishers
self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10)
self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) self.debug_pub = self.create_publisher(String, '/ptz/debug', 10)
#create timer to publish feedback at a regular interval
#self.create_timer(1.0, self.publish_feedback)
# Create subscribers # Create subscribers
self.control_sub = self.create_subscription( self.control_sub = self.create_subscription(
@@ -59,18 +57,18 @@ class PtzNode(Node):
# Create feedback message # Create feedback message
self.feedback_msg = PtzFeedback() self.feedback_msg = PtzFeedback()
self.feedback_msg.connected = False self.feedback_msg.connected = False # This will reflect the actual connection state
self.feedback_msg.error_msg = "Initializing" self.feedback_msg.error_msg = "Initializing"
# Flags for async operations # Flags for async operations
self.shutdown_requested = False self.shutdown_requested = False
# Set up asyncio event loop in a separate thread # Set up asyncio event loop in a separate thread
self.thread_pool = ThreadPoolExecutor(max_workers=1) # Renamed from self.executor self.thread_pool = ThreadPoolExecutor(max_workers=1)
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()
# Connect to camera on startup # Connect to camera on startup
self.connect_task = self.thread_pool.submit( # Use thread_pool instead of executor self.connect_task = self.thread_pool.submit(
self.run_async_func, self.connect_to_camera() self.run_async_func, self.connect_to_camera()
) )
@@ -97,47 +95,53 @@ class PtzNode(Node):
self.feedback_msg.connected = True self.feedback_msg.connected = True
self.feedback_msg.error_msg = "" self.feedback_msg.error_msg = ""
#self.get_logger().info("Connected to PTZ camera")
self.publish_debug("Camera connected successfully") self.publish_debug("Camera connected successfully")
except Exception as e: except Exception as e:
#self.camera_connected = False self.camera_connected = False
self.feedback_msg.connected = False self.feedback_msg.connected = False
self.feedback_msg.error_msg = f"Connection error: {str(e)}" self.feedback_msg.error_msg = f"Connection error: {str(e)}"
#self.get_logger().error(f"Failed to connect to camera: {e}")
self.publish_debug(f"Camera connection failed: {str(e)}") self.publish_debug(f"Camera connection failed: {str(e)}")
def camera_data_callback(self, cmd_id, data): def camera_data_callback(self, cmd_id, data):
"""Handle data received from the camera.""" """Handle data received from the camera."""
# Update last_data_time regardless of self.camera_connected,
# as data might arrive during a brief reconnect window.
self.last_data_time = time.time()
if self.camera_connected: # Only process for feedback if we believe we are connected
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData):
self.last_data_time = time.time() # Update timestamp on successful data
# Update feedback message with attitude data
self.feedback_msg.yaw = data.yaw self.feedback_msg.yaw = data.yaw
self.feedback_msg.pitch = data.pitch self.feedback_msg.pitch = data.pitch
self.feedback_msg.roll = data.roll self.feedback_msg.roll = data.roll
self.feedback_msg.yaw_velocity = data.yaw_velocity self.feedback_msg.yaw_velocity = data.yaw_velocity
self.feedback_msg.pitch_velocity = data.pitch_velocity self.feedback_msg.pitch_velocity = data.pitch_velocity
self.feedback_msg.roll_velocity = data.roll_velocity self.feedback_msg.roll_velocity = data.roll_velocity
# Publish feedback
self.feedback_pub.publish(self.feedback_msg) self.feedback_pub.publish(self.feedback_msg)
else: else:
# Log other data for debugging debug_str = ""
debug_str = f"Camera data: CMD_ID={cmd_id}, Data={data}" if isinstance(cmd_id, CommandID):
debug_str = f"Camera data: CMD_ID={cmd_id.name}, Data="
else:
debug_str = f"Camera data: CMD_ID={cmd_id}, Data="
if isinstance(data, bytes):
debug_str += data.hex()
else:
debug_str += str(data)
self.get_logger().debug(debug_str) self.get_logger().debug(debug_str)
def check_camera_connection(self): def check_camera_connection(self):
"""Periodically check camera connection and attempt to reconnect if needed.""" """Periodically check camera connection and attempt to reconnect if needed."""
if not self.camera_connected and not self.shutdown_requested: if not self.camera_connected and not self.shutdown_requested:
#self.get_logger().info("Attempting to reconnect to camera...") self.publish_debug("Attempting to reconnect to camera...")
if self.camera: if self.camera:
# Fully clean up old connection first
try: try:
if self.camera.is_connected: # SDK's internal connection state
self.run_async_func(self.camera.disconnect()) self.run_async_func(self.camera.disconnect())
except Exception: except Exception as e:
pass # Ignore errors during cleanup self.get_logger().debug(f"Error during pre-reconnect disconnect: {e}")
self.camera = None # self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new
self.connect_task = self.thread_pool.submit( self.connect_task = self.thread_pool.submit(
self.run_async_func, self.connect_to_camera() self.run_async_func, self.connect_to_camera()
@@ -145,137 +149,180 @@ class PtzNode(Node):
def check_camera_health(self): def check_camera_health(self):
"""Check if we're still receiving data from the camera""" """Check if we're still receiving data from the camera"""
if self.camera_connected: if self.camera_connected: # Only check health if we think we are connected
time_since_last_data = time.time() - self.last_data_time time_since_last_data = time.time() - self.last_data_time
if time_since_last_data > 5.0: # No data for 5 seconds if time_since_last_data > 5.0:
#self.get_logger().warning(f"No camera data for {time_since_last_data:.1f}s, connection may be stale") self.publish_debug(f"No camera data for {time_since_last_data:.1f}s, marking as disconnected.")
self.camera_connected = False self.camera_connected = False
# Force reconnection on next check_camera_connection timer self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Connection stale (no data)"
self.feedback_pub.publish(self.feedback_msg)
def handle_control_command(self, msg): def handle_control_command(self, msg):
"""Handle incoming control commands.""" """Handle incoming control commands."""
if not self.camera_connected: # Removed: if not self.camera_connected
#self.get_logger().warning("Camera not connected, ignoring control command") if not self.camera: # Still check if camera object exists
self.get_logger().warning("Camera object not initialized, ignoring control command")
return return
# Submit the control command to the async executor self.thread_pool.submit(
self.thread_pool.submit( # Changed from self.executor
self.run_async_func, self.run_async_func,
self.process_control_command(msg) self.process_control_command(msg)
) )
async def process_control_command(self, msg): async def process_control_command(self, msg):
"""Process and send the control command to the camera.""" """Process and send the control command to the camera."""
if not self.camera:
self.get_logger().error("Process control command called but camera object is None.")
return
try: try:
# Check if reset command # The SDK's send_... methods will raise RuntimeError if not connected.
# This try-except block will catch those.
if msg.reset: if msg.reset:
self.get_logger().info("Resetting camera to center position") self.get_logger().info("Attempting to reset camera to center position")
await self.camera.send_attitude_angles_command(0.0, 0.0) await self.camera.send_attitude_angles_command(0.0, 0.0)
return return
# Process based on control mode if msg.control_mode == 0:
if msg.control_mode == 0: # Relative speed control turn_yaw = max(-100, min(100, int(msg.turn_yaw)))
# Clamp values between -100 and 100 turn_pitch = max(-100, min(100, int(msg.turn_pitch)))
turn_yaw = max(-100, min(100, msg.turn_yaw)) self.get_logger().debug(f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}")
turn_pitch = max(-100, min(100, msg.turn_pitch))
self.get_logger().debug(f"Sending rotation command: yaw={turn_yaw}, pitch={turn_pitch}")
await self.camera.send_rotation_command(turn_yaw, turn_pitch) await self.camera.send_rotation_command(turn_yaw, turn_pitch)
elif msg.control_mode == 1: # Absolute angle control elif msg.control_mode == 1:
# Clamp angles to valid ranges
yaw = max(-135.0, min(135.0, msg.yaw)) yaw = max(-135.0, min(135.0, msg.yaw))
pitch = max(-90.0, min(90.0, msg.pitch)) pitch = max(-90.0, min(90.0, msg.pitch))
self.get_logger().debug(f"Attempting absolute angles: yaw={yaw}, pitch={pitch}")
self.get_logger().debug(f"Sending absolute angles: yaw={yaw}, pitch={pitch}")
await self.camera.send_attitude_angles_command(yaw, pitch) await self.camera.send_attitude_angles_command(yaw, pitch)
elif msg.control_mode == 2: # Single axis control elif msg.control_mode == 2:
axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH
self.get_logger().debug(f"Sending single axis command: axis={axis.name}, angle={msg.angle}") angle = msg.angle
await self.camera.send_single_axis_attitude_command(msg.angle, axis) self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}")
await self.camera.send_single_axis_attitude_command(angle, axis)
# Handle data streaming configuration if requested elif msg.control_mode == 3:
zoom_level = msg.zoom_level
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x")
await self.camera.send_absolute_zoom_command(zoom_level)
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'):
if msg.stream_type > 0 and msg.stream_freq >= 0: if msg.stream_type > 0 and msg.stream_freq >= 0:
try: try:
stream_type = DataStreamType(msg.stream_type) stream_type = DataStreamType(msg.stream_type)
stream_freq = DataStreamFrequency(msg.stream_freq) stream_freq = DataStreamFrequency(msg.stream_freq)
self.get_logger().info( self.get_logger().info(
f"Setting data stream: type={stream_type.name}, freq={stream_freq.name}" f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}"
) )
await self.camera.send_data_stream_request(stream_type, stream_freq) await self.camera.send_data_stream_request(stream_type, stream_freq)
except ValueError: except ValueError:
self.get_logger().error("Invalid stream type or frequency values") self.get_logger().error("Invalid stream type or frequency values in control message")
except RuntimeError as e: # Catch SDK's "not connected" errors
self.get_logger().warning(f"SDK command failed (likely not connected): {e}")
# self.camera_connected will be updated by health/connection checks
# self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check
# self.feedback_pub.publish(self.feedback_msg)
except Exception as e: except Exception as e:
self.get_logger().error(f"Error processing control command: {e}") self.get_logger().error(f"Error processing control command: {e}")
self.feedback_msg.error_msg = f"Control error: {str(e)}" self.feedback_msg.error_msg = f"Control error: {str(e)}"
self.feedback_pub.publish(self.feedback_msg) self.feedback_pub.publish(self.feedback_msg) # Publish for other errors
def publish_debug(self, message): def publish_debug(self, message_text):
"""Publish debug message.""" """Publish debug message."""
msg = String() msg = String()
msg.data = message msg.data = f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
self.get_logger().info(message_text)
def run_async_func(self, coro): def run_async_func(self, coro):
"""Run an async function in the event loop.""" """Run an async function in the event loop."""
return asyncio.run_coroutine_threadsafe(coro, self.loop).result() if self.loop and self.loop.is_running():
async def shutdown(self):
"""Perform clean shutdown."""
self.shutdown_requested = True
if self.camera and self.camera_connected:
try: try:
# Stop any data streams return asyncio.run_coroutine_threadsafe(coro, self.loop).result(timeout=5.0) # Added timeout
except asyncio.TimeoutError:
self.get_logger().warning(f"Async function {coro.__name__} timed out.")
return None
except Exception as e:
self.get_logger().error(f"Exception in run_async_func for {coro.__name__}: {e}")
return None
else:
self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.")
return None
async def shutdown_node_async(self):
"""Perform clean shutdown of camera connection."""
self.shutdown_requested = True
self.get_logger().info("Async shutdown initiated...")
if self.camera and self.camera.is_connected: # Check SDK's connection state
try:
self.get_logger().info("Disabling data stream...")
await self.camera.send_data_stream_request( await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA, DataStreamType.ATTITUDE_DATA,
DataStreamFrequency.DISABLE DataStreamFrequency.DISABLE
) )
await asyncio.sleep(0.1)
# Disconnect from camera self.get_logger().info("Disconnecting from camera...")
await self.camera.disconnect() await self.camera.disconnect()
self.get_logger().info("Disconnected from camera") self.get_logger().info("Disconnected from camera successfully.")
except Exception as e: except Exception as e:
self.get_logger().error(f"Error during shutdown: {e}") self.get_logger().error(f"Error during camera shutdown: {e}")
self.camera_connected = False # Update node's flag
self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Shutting down"
def cleanup(self): def cleanup(self):
"""Clean up resources.""" """Clean up resources."""
if self.loop and self.thread_pool: # Changed from self.executor self.get_logger().info("PTZ node cleanup initiated.")
# Schedule the shutdown coroutine self.shutdown_requested = True
if self.connection_timer:
self.connection_timer.cancel()
if self.health_check_timer:
self.health_check_timer.cancel()
if self.loop and self.thread_pool:
if self.loop.is_running():
try: try:
self.run_async_func(self.shutdown()) future = asyncio.run_coroutine_threadsafe(self.shutdown_node_async(), self.loop)
future.result(timeout=5)
except Exception as e: except Exception as e:
self.get_logger().error(f"Error during cleanup: {e}") self.get_logger().error(f"Error during async shutdown in cleanup: {e}")
# Shut down the executor self.get_logger().info("Shutting down thread pool executor...")
self.thread_pool.shutdown() # Changed from self.executor self.thread_pool.shutdown(wait=True)
# Close the event loop if self.loop.is_running():
self.loop.stop() self.get_logger().info("Stopping asyncio event loop...")
self.loop.close() self.loop.call_soon_threadsafe(self.loop.stop)
self.get_logger().info("PTZ node resources cleaned up.")
else:
self.get_logger().warning("Loop or thread_pool not initialized, skipping parts of cleanup.")
self.get_logger().info("PTZ node resources cleaned up")
def main(args=None): def main(args=None):
"""Main function.""" """Main function."""
# Initialize ROS
rclpy.init(args=args) rclpy.init(args=args)
# Create the node
ptz_node = PtzNode() ptz_node = PtzNode()
# Create and start event loop for the camera asyncio_thread = None
if ptz_node.loop:
def run_event_loop(loop): def run_event_loop(loop):
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try:
loop.run_forever() loop.run_forever()
finally:
# This ensures the loop is closed when the thread running it exits.
# This is important if run_forever() exits due to loop.stop()
# or an unhandled exception within a task scheduled on the loop.
if not loop.is_closed():
loop.close()
# Start the asyncio loop in a separate thread
asyncio_thread = threading.Thread( asyncio_thread = threading.Thread(
target=run_event_loop, target=run_event_loop,
args=(ptz_node.loop,), args=(ptz_node.loop,),
@@ -284,18 +331,25 @@ def main(args=None):
asyncio_thread.start() asyncio_thread.start()
try: try:
# Spin the node to process callbacks
rclpy.spin(ptz_node) rclpy.spin(ptz_node)
except KeyboardInterrupt: except KeyboardInterrupt:
pass ptz_node.get_logger().info("KeyboardInterrupt received, shutting down...")
except SystemExit:
ptz_node.get_logger().info("SystemExit received, shutting down...")
finally: finally:
# Clean up ptz_node.get_logger().info("Initiating final cleanup...")
ptz_node.cleanup() ptz_node.cleanup() # This will stop the loop and shutdown the executor
if asyncio_thread and asyncio_thread.is_alive():
# The loop should have been stopped by cleanup. We just join the thread.
ptz_node.get_logger().info("Waiting for asyncio thread to join...")
asyncio_thread.join(timeout=5)
if asyncio_thread.is_alive():
ptz_node.get_logger().warning("Asyncio thread did not join cleanly.")
rclpy.shutdown() rclpy.shutdown()
ptz_node.get_logger().info("ROS shutdown complete.")
if __name__ == '__main__': if __name__ == '__main__':
# Register signal handler for clean shutdown
signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0))
main() main()

View File

@@ -9,11 +9,6 @@ from crccheck.crc import Crc16
# Set up logging # Set up logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) # You can adjust the logging level as needed
handler = logging.StreamHandler() # Output to console
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Global constant for the heartbeat packet (used in TCP keep-alive) # Global constant for the heartbeat packet (used in TCP keep-alive)
HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B") HEARTBEAT_PACKET = bytes.fromhex("55 66 01 01 00 00 00 00 00 59 8B")
@@ -69,10 +64,11 @@ class CommandID(Enum):
""" """
ROTATION_CONTROL = 0x07 ROTATION_CONTROL = 0x07
ABSOLUTE_ZOOM = 0x08
ATTITUDE_ANGLES = 0x0E ATTITUDE_ANGLES = 0x0E
SINGLE_AXIS_CONTROL = 0x41 SINGLE_AXIS_CONTROL = 0x41
DATA_STREAM_REQUEST = 0x25 DATA_STREAM_REQUEST = 0x25
ATTITUDE_DATA_RESPONSE = 0x0D # Expected response packet for attitude data ATTITUDE_DATA_RESPONSE = 0x0D
@dataclass @dataclass
@@ -96,7 +92,6 @@ class AttitudeData:
"""Create an AttitudeData instance from a byte string.""" """Create an AttitudeData instance from a byte string."""
if len(data) != 12: if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.") raise ValueError("Attitude data should be exactly 12 bytes.")
# Unpack six little-endian int16 values
values = struct.unpack("<hhhhhh", data) values = struct.unpack("<hhhhhh", data)
return cls( return cls(
yaw=values[0] / 10.0, yaw=values[0] / 10.0,
@@ -111,51 +106,29 @@ class AttitudeData:
class SiyiGimbalCamera: class SiyiGimbalCamera:
""" """
Class to interface with the SIYI Gimbal Camera over TCP. Class to interface with the SIYI Gimbal Camera over TCP.
This class handles:
- Establishing a TCP connection.
- Sending periodic heartbeat packets.
- Building and sending commands (e.g. for rotation, setting attitudes,
single-axis control, and data stream requests).
- Continuously reading incoming packets in a background task.
Command identifiers and axis flags are represented as enums, and the parsed
attitude data is returned as an AttitudeData object.
The data stream listener is started as a background task when a connection is
established. Users can provide or update the callback used for processing data
stream packets by calling set_data_callback().
""" """
def __init__(self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2): MAX_A8_MINI_ZOOM = 6.0 # Maximum zoom for A8 mini
"""
Initialize a new SiyiGimbalCamera instance.
:param ip: The IP address (as a string) of the SIYI gimbal camera. def __init__(
:param port: The TCP port (as an integer) used for the connection (default: 37260). self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2
:param heartbeat_interval: Interval in seconds (int) between heartbeat packets. ):
"""
self.ip = ip self.ip = ip
self.port = port self.port = port
self.heartbeat_interval = heartbeat_interval self.heartbeat_interval = heartbeat_interval
# These will be assigned after successfully connecting. self.reader: Optional[asyncio.StreamReader] = None
self.reader: asyncio.StreamReader self.writer: Optional[asyncio.StreamWriter] = None
self.writer: asyncio.StreamWriter
self.is_connected = False self.is_connected = False
# Sequence counter (integer in the range 0 to 65535) for outgoing packets.
self.seq: int = 0 self.seq: int = 0
self._data_callback: Optional[Callable[[CommandID, Any], None]] = None self._data_callback: Optional[Callable[[CommandID | int, Any], None]] = None
async def connect(self) -> None: async def connect(self) -> None:
"""Establish a TCP connection with the gimbal camera."""
try: try:
self.reader, self.writer = await asyncio.open_connection( self.reader, self.writer = await asyncio.open_connection(
self.ip, self.port self.ip, self.port
) )
self.is_connected = True self.is_connected = True
#logger.info(f"Connected to {self.ip}:{self.port}")
asyncio.create_task(self.heartbeat_loop()) asyncio.create_task(self.heartbeat_loop())
# Start the data stream listener in the background.
asyncio.create_task(self._data_stream_listener()) asyncio.create_task(self._data_stream_listener())
except (socket.gaierror, ConnectionRefusedError) as e: except (socket.gaierror, ConnectionRefusedError) as e:
self.is_connected = False self.is_connected = False
@@ -163,104 +136,113 @@ class SiyiGimbalCamera:
raise raise
async def disconnect(self) -> None: async def disconnect(self) -> None:
"""Gracefully disconnect from the gimbal camera.""" if self.is_connected and self.writer:
if self.is_connected:
self.is_connected = False self.is_connected = False
self.writer.close() self.writer.close()
await self.writer.wait_closed() await self.writer.wait_closed()
#logger.info("Disconnected")
else: else:
#logger.warning("Not connected, cannot disconnect.")
pass pass
async def heartbeat_loop(self) -> None: async def heartbeat_loop(self) -> None:
"""Periodically send heartbeat packets to maintain the TCP connection.""" if not self.is_connected or not self.writer:
if not self.is_connected:
#logger.warning("Heartbeat loop started before connection was established.")
return return
try: try:
while self.is_connected: while self.is_connected:
try: try:
self.writer.write(HEARTBEAT_PACKET) self.writer.write(HEARTBEAT_PACKET)
await self.writer.drain() await self.writer.drain()
#logger.debug("Sent heartbeat packet")
await asyncio.sleep(self.heartbeat_interval) await asyncio.sleep(self.heartbeat_interval)
except (socket.error, BrokenPipeError) as e: except (socket.error, BrokenPipeError) as e:
#logger.error(f"Connection error in heartbeat loop: {e}")
break break
finally: finally:
#logger.info("Heartbeat loop stopped.")
if self.is_connected: if self.is_connected:
await self.disconnect() await self.disconnect()
def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: def _build_packet_header(
"""Build a full packet for the 0x07 Gimbal Rotation Control command.""" self, cmd_id: CommandID, data_len: int
) -> bytearray:
"""Helper to build the common packet header."""
packet = bytearray() packet = bytearray()
packet.extend(b"\x55\x66") # STX bytes packet.extend(b"\x55\x66") # STX
packet.append(0x01) # CTRL (request ACK) packet.append(0x01) # CTRL (request ACK)
packet.extend(struct.pack("<H", 2)) # Data length: 2 bytes packet.extend(struct.pack("<H", data_len))
packet.extend(struct.pack("<H", self.seq)) # Sequence number packet.extend(struct.pack("<H", self.seq))
packet.append(CommandID.ROTATION_CONTROL.value) # CMD_ID for rotation control packet.append(cmd_id.value)
packet.extend(struct.pack("bb", turn_yaw, turn_pitch)) # Data: two int8 values return packet
def _finalize_packet(self, packet: bytearray) -> bytes:
"""Helper to add CRC and increment sequence number."""
crc_value = Crc16.calc(packet) crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value)) packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000 self.seq = (self.seq + 1) % 0x10000
return bytes(packet) return bytes(packet)
async def send_rotation_command(self, turn_yaw: int, turn_pitch: int) -> None: def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes:
"""Send a rotation control command (0x07) with the given yaw and pitch values.""" data_len = 2
if not self.is_connected: packet = self._build_packet_header(
raise RuntimeError("Socket is not connected, cannot send rotation command.") CommandID.ROTATION_CONTROL, data_len
)
packet.extend(struct.pack("bb", turn_yaw, turn_pitch))
return self._finalize_packet(packet)
async def send_rotation_command(
self, turn_yaw: int, turn_pitch: int
) -> None:
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send rotation command."
)
packet = self._build_rotation_packet(turn_yaw, turn_pitch) packet = self._build_rotation_packet(turn_yaw, turn_pitch)
self.writer.write(packet) self.writer.write(packet)
await self.writer.drain() await self.writer.drain()
logger.debug(f"Sent rotation command with yaw {turn_yaw} and pitch {turn_pitch}") logger.debug(
f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}"
)
def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes: def _build_attitude_angles_packet(
"""Build a full packet for the 0x0E 'Set Gimbal Attitude Angles' command.""" self, yaw: float, pitch: float
packet = bytearray() ) -> bytes:
packet.extend(b"\x55\x66") data_len = 4
packet.append(0x01) # CTRL (request ACK) packet = self._build_packet_header(
packet.extend(struct.pack("<H", 4)) # Data length: 4 bytes (2 for each angle) CommandID.ATTITUDE_ANGLES, data_len
packet.extend(struct.pack("<H", self.seq)) )
packet.append(CommandID.ATTITUDE_ANGLES.value) # CMD_ID for set gimbal attitude angles
yaw_int = int(round(yaw * 10)) yaw_int = int(round(yaw * 10))
pitch_int = int(round(pitch * 10)) pitch_int = int(round(pitch * 10))
packet.extend(struct.pack("<hh", yaw_int, pitch_int)) packet.extend(struct.pack("<hh", yaw_int, pitch_int))
crc_value = Crc16.calc(packet) return self._finalize_packet(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_attitude_angles_command(self, yaw: float, pitch: float) -> None: async def send_attitude_angles_command(
"""Send the 0x0E 'Set Gimbal Attitude Angles' command to set absolute angles.""" self, yaw: float, pitch: float
if not self.is_connected: ) -> None:
raise RuntimeError("Socket is not connected, cannot send attitude angles command.") if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send attitude angles command."
)
packet = self._build_attitude_angles_packet(yaw, pitch) packet = self._build_attitude_angles_packet(yaw, pitch)
self.writer.write(packet) self.writer.write(packet)
await self.writer.drain() await self.writer.drain()
logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°") logger.debug(
f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°"
)
def _build_single_axis_attitude_packet(self, angle: float, axis: SingleAxis) -> bytes: def _build_single_axis_attitude_packet(
"""Build a full packet for the 0x41 'Single-Axis Attitude Control' command.""" self, angle: float, axis: SingleAxis
packet = bytearray() ) -> bytes:
packet.extend(b"\x55\x66") data_len = 3
packet.append(0x01) # CTRL packet = self._build_packet_header(
packet.extend(struct.pack("<H", 3)) # Data length: 3 bytes CommandID.SINGLE_AXIS_CONTROL, data_len
packet.extend(struct.pack("<H", self.seq)) )
packet.append(CommandID.SINGLE_AXIS_CONTROL.value) # CMD_ID for single-axis control
angle_int = int(round(angle * 10)) angle_int = int(round(angle * 10))
packet.extend(struct.pack("<hB", angle_int, axis.value)) packet.extend(struct.pack("<hB", angle_int, axis.value))
crc_value = Crc16.calc(packet) return self._finalize_packet(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_single_axis_attitude_command(self, angle: float, axis: SingleAxis) -> None: async def send_single_axis_attitude_command(
"""Send the 0x41 single-axis attitude control command.""" self, angle: float, axis: SingleAxis
if not self.is_connected: ) -> None:
raise RuntimeError("Socket is not connected, cannot send single-axis attitude command.") if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send single-axis attitude command."
)
packet = self._build_single_axis_attitude_packet(angle, axis) packet = self._build_single_axis_attitude_packet(angle, axis)
self.writer.write(packet) self.writer.write(packet)
await self.writer.drain() await self.writer.drain()
@@ -268,103 +250,253 @@ class SiyiGimbalCamera:
f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°" f"Sent single-axis attitude command for {axis.name.lower()} with angle {angle}°"
) )
def _build_data_stream_packet(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> bytes: def _build_data_stream_packet(
"""Build a packet for the 0x25 'Request Gimbal to Send Data Stream' command.""" self, data_type: DataStreamType, data_freq: DataStreamFrequency
packet = bytearray() ) -> bytes:
packet.extend(b"\x55\x66") data_len = 2
packet.append(0x01) # CTRL, request ACK packet = self._build_packet_header(
packet.extend(struct.pack("<H", 2)) # Data length: 2 bytes CommandID.DATA_STREAM_REQUEST, data_len
packet.extend(struct.pack("<H", self.seq)) )
packet.append(CommandID.DATA_STREAM_REQUEST.value) # CMD_ID for data stream request packet.append(data_type.value)
packet.append(data_type.value) # Data type (uint8) packet.append(data_freq.value)
packet.append(data_freq.value) # Data frequency (uint8) return self._finalize_packet(packet)
crc_value = Crc16.calc(packet)
packet.extend(struct.pack("<H", crc_value))
self.seq = (self.seq + 1) % 0x10000
return bytes(packet)
async def send_data_stream_request(self, data_type: DataStreamType, data_freq: DataStreamFrequency) -> None: async def send_data_stream_request(
"""Send the 0x25 command to request a continuous data stream from the gimbal.""" self, data_type: DataStreamType, data_freq: DataStreamFrequency
if not self.is_connected: ) -> None:
raise RuntimeError("Socket is not connected, cannot request data stream.") if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot request data stream."
)
packet = self._build_data_stream_packet(data_type, data_freq) packet = self._build_data_stream_packet(data_type, data_freq)
self.writer.write(packet) self.writer.write(packet)
await self.writer.drain() await self.writer.drain()
#logger.info(f"Sent data stream request: data_type {data_type}, data_freq {data_freq}") logger.info(
f"Sent data stream request: data_type {data_type.name}, data_freq {data_freq.name}"
)
def _build_absolute_zoom_packet(self, zoom_level: float) -> bytes:
data_len = 2
packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len)
zoom_packet_value = int(round(zoom_level * 10))
if not (0 <= zoom_packet_value <= 65535): # Should be caught by clamping earlier
raise ValueError(
"Zoom packet value out of uint16_t range after conversion."
)
packet.extend(struct.pack("<H", zoom_packet_value))
return self._finalize_packet(packet)
async def send_absolute_zoom_command(self, zoom_level: float) -> None:
"""
Send an absolute zoom command (0x08) to the gimbal.
:param zoom_level: The desired zoom level as a float (e.g., 1.0 for 1x,
2.5 for 2.5x). For A8 mini, this will be clamped
between 1.0 and MAX_A8_MINI_ZOOM (6.0).
"""
if not self.is_connected or not self.writer:
raise RuntimeError(
"Socket is not connected or writer is None, cannot send absolute zoom command."
)
original_requested_zoom = zoom_level # For logging clarity
if zoom_level < 1.0:
logger.warning(
f"Requested zoom level {original_requested_zoom:.1f} is less than 1.0. Clamping to 1.0x."
)
zoom_level = 1.0
elif zoom_level > self.MAX_A8_MINI_ZOOM: # Check against max zoom
logger.warning(
f"Requested zoom level {original_requested_zoom:.1f} exceeds maximum {self.MAX_A8_MINI_ZOOM:.1f}x for A8 mini. "
f"Clamping to {self.MAX_A8_MINI_ZOOM:.1f}x."
)
zoom_level = self.MAX_A8_MINI_ZOOM
packet = self._build_absolute_zoom_packet(zoom_level)
self.writer.write(packet)
await self.writer.drain()
logger.debug(
f"Sent absolute zoom command with level {zoom_level:.1f}x (original request: {original_requested_zoom:.1f}x)"
)
async def _read_packet(self): async def _read_packet(self):
"""Read a full packet from the TCP stream following the protocol format.""" if not self.reader:
raise RuntimeError("Reader is not initialized.")
stx = await self.reader.readexactly(2) stx = await self.reader.readexactly(2)
if stx != b"\x55\x66": if stx != b"\x55\x66":
raise ValueError("Invalid packet start bytes.") raise ValueError(f"Invalid packet start bytes: {stx.hex()}")
ctrl = await self.reader.readexactly(1) ctrl = await self.reader.readexactly(1)
data_len_bytes = await self.reader.readexactly(2) data_len_bytes = await self.reader.readexactly(2)
data_len = struct.unpack("<H", data_len_bytes)[0] data_len = struct.unpack("<H", data_len_bytes)[0]
seq = await self.reader.readexactly(2) seq_bytes = await self.reader.readexactly(2) # Renamed for clarity
# seq_val = struct.unpack("<H", seq_bytes)[0] # If you need the sequence value
cmd_id_bytes = await self.reader.readexactly(1) cmd_id_bytes = await self.reader.readexactly(1)
cmd_id = cmd_id_bytes[0] cmd_id_val = cmd_id_bytes[0] # Renamed for clarity
# Protect against excessively large data_len
if data_len > 2048: # Arbitrary reasonable limit
raise ValueError(f"Excessive data length received: {data_len}")
data = await self.reader.readexactly(data_len) data = await self.reader.readexactly(data_len)
crc_bytes = await self.reader.readexactly(2) crc_bytes = await self.reader.readexactly(2)
received_crc = struct.unpack("<H", crc_bytes)[0] received_crc = struct.unpack("<H", crc_bytes)[0]
packet_without_crc = stx + ctrl + data_len_bytes + seq + cmd_id_bytes + data
packet_without_crc = (
stx + ctrl + data_len_bytes + seq_bytes + cmd_id_bytes + data
)
computed_crc = Crc16.calc(packet_without_crc) computed_crc = Crc16.calc(packet_without_crc)
if computed_crc != received_crc: if computed_crc != received_crc:
raise ValueError("CRC check failed for received packet.") raise ValueError(
return cmd_id, data f"CRC check failed. Expected {computed_crc:04X}, got {received_crc:04X}. "
f"Packet: {packet_without_crc.hex()}"
)
return cmd_id_val, data
@staticmethod @staticmethod
def parse_attitude_data(data: bytes) -> AttitudeData: def parse_attitude_data(data: bytes) -> AttitudeData:
"""Parse the attitude data (12 bytes) into human-readable values."""
if len(data) != 12: if len(data) != 12:
raise ValueError("Attitude data should be exactly 12 bytes.") raise ValueError("Attitude data should be exactly 12 bytes.")
return AttitudeData.from_bytes(data) return AttitudeData.from_bytes(data)
async def _data_stream_listener(self) -> None: async def _data_stream_listener(self) -> None:
"""
Continuously listen for incoming packets from the gimbal and process them.
If a data callback is set via set_data_callback(), the callback is called
with the CommandID and the corresponding data (parsed or raw).
"""
while self.is_connected: while self.is_connected:
try: try:
cmd_id_int, data = await self._read_packet() cmd_id_int, data = await self._read_packet()
cmd_id = CommandID(cmd_id_int) try:
# Process data as before... cmd_id_enum = CommandID(cmd_id_int)
except (ConnectionResetError, BrokenPipeError, ConnectionError) as e: except ValueError:
logger.error(f"Connection error: {e}") logger.warning(
self.is_connected = False f"Received unknown CMD_ID {cmd_id_int:02X}, data: {data.hex()}"
break # Exit the loop to allow reconnection )
except Exception as e: if self._data_callback:
#logger.error(f"Error reading packet: {e}") self._data_callback(cmd_id_int, data)
## Continue trying for other errors, but count failures
continue continue
# Process attitude data packets if applicable. if (
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and len(data) == 12: cmd_id_enum == CommandID.ATTITUDE_DATA_RESPONSE
and len(data) == 12
):
try: try:
parsed = AttitudeData.from_bytes(data) parsed = AttitudeData.from_bytes(data)
except Exception as e:
logger.exception(f"Failed to parse attitude data: {e}")
parsed = None
if self._data_callback: if self._data_callback:
self._data_callback(cmd_id, parsed) self._data_callback(cmd_id_enum, parsed)
else: else:
logger.info(f"Received attitude data: {parsed}") logger.info(f"Received attitude data: {parsed}")
except Exception as e:
logger.exception(
f"Failed to parse attitude data: {e}"
)
if self._data_callback:
self._data_callback(cmd_id_enum, data)
else: else:
if self._data_callback: if self._data_callback:
self._data_callback(cmd_id, data) self._data_callback(cmd_id_enum, data)
else: else:
logger.info(f"Received packet with CMD_ID {cmd_id}: {data}") logger.info(
f"Received packet with CMD_ID {cmd_id_enum.name} ({cmd_id_enum.value:02X}): {data.hex()}"
)
def set_data_callback(self, callback: Callable[[CommandID, Any], None]) -> None: except (
""" ConnectionResetError,
Change the callback function used by the data stream listener. BrokenPipeError,
ConnectionError,
asyncio.IncompleteReadError,
) as e:
logger.error(f"Connection error in listener: {e}")
self.is_connected = False
break
except ValueError as e:
logger.error(f"Packet error in listener: {e}")
# Consider adding a small delay or a mechanism to resync if this happens frequently
await asyncio.sleep(0.1) # Small delay before trying to read again
continue
except Exception as e:
logger.exception(f"Unexpected error in data stream listener: {e}")
# Depending on the error, you might want to break or continue
await asyncio.sleep(0.1) # Small delay
continue
The callback should be a function that accepts two arguments: def set_data_callback(
- A CommandID indicating the type of packet received. self, callback: Callable[[CommandID | int, Any], None]
- The parsed data (for attitude data, an AttitudeData instance) or raw data. ) -> None:
:param callback: A callable to be used as the data callback.
"""
self._data_callback = callback self._data_callback = callback
async def main_sdk_test(): # Renamed to avoid conflict if this file is imported
gimbal_ip = "192.168.144.25"
gimbal = SiyiGimbalCamera(gimbal_ip)
def my_data_handler(cmd_id, data):
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData):
print(
f"Attitude: Yaw={data.yaw:.1f}, Pitch={data.pitch:.1f}, Roll={data.roll:.1f}"
)
elif isinstance(cmd_id, CommandID):
print(
f"Received {cmd_id.name}: {data.hex() if isinstance(data, bytes) else data}"
)
else:
print(
f"Received Unknown CMD_ID {cmd_id:02X}: {data.hex() if isinstance(data, bytes) else data}"
)
gimbal.set_data_callback(my_data_handler)
try:
await gimbal.connect()
if gimbal.is_connected:
print("SDK Test: Successfully connected to the gimbal.")
print("SDK Test: Setting zoom to 1.0x")
await gimbal.send_absolute_zoom_command(1.0)
await asyncio.sleep(2)
print("SDK Test: Setting zoom to 3.5x")
await gimbal.send_absolute_zoom_command(3.5)
await asyncio.sleep(2)
print("SDK Test: Setting zoom to 6.0x (A8 mini max)")
await gimbal.send_absolute_zoom_command(6.0)
await asyncio.sleep(2)
print("SDK Test: Attempting zoom to 7.0x (should be clamped to 6.0x)")
await gimbal.send_absolute_zoom_command(7.0)
await asyncio.sleep(2)
print("SDK Test: Attempting zoom to 0.5x (should be clamped to 1.0x)")
await gimbal.send_absolute_zoom_command(0.5)
await asyncio.sleep(2)
print("SDK Test: Requesting attitude data stream at 5Hz...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_5)
print("SDK Test: Listening for data for 10 seconds...")
await asyncio.sleep(10)
print("SDK Test: Disabling attitude data stream...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE)
await asyncio.sleep(1)
except ConnectionRefusedError:
print(
f"SDK Test: Connection to {gimbal_ip} was refused. Is the gimbal on and accessible?"
)
except Exception as e:
print(f"SDK Test: An error occurred: {e}")
finally:
if gimbal.is_connected:
print("SDK Test: Disconnecting...")
await gimbal.disconnect()
print("SDK Test: Program finished.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
# To see debug logs from this SDK specifically:
# logger.setLevel(logging.DEBUG)
asyncio.run(main_sdk_test())