From c107b82a8d90375691fdd859d434a848db1a2543 Mon Sep 17 00:00:00 2001 From: ryleu <69326171+ryleu@users.noreply.github.com> Date: Thu, 6 Nov 2025 19:10:21 -0600 Subject: [PATCH] reformat with black --- src/anchor_pkg/anchor_pkg/anchor_node.py | 107 ++++++---- src/anchor_pkg/launch/rover.launch.py | 138 ++++++------- src/anchor_pkg/setup.py | 27 ++- src/arm_pkg/arm_pkg/arm_node.py | 12 +- src/arm_pkg/setup.py | 27 ++- src/bio_pkg/bio_pkg/bio_node.py | 119 ++++++----- src/bio_pkg/setup.py | 25 +-- src/core_pkg/core_pkg/core_headless.py | 33 +-- src/core_pkg/core_pkg/core_node.py | 230 +++++++++++++-------- src/core_pkg/core_pkg/core_ptz.py | 244 +++++++++++++---------- src/core_pkg/core_pkg/siyi_sdk/siyi.py | 92 ++++----- src/core_pkg/core_pkg/siyi_sdk/test.py | 12 +- src/core_pkg/setup.py | 25 ++- src/headless_pkg/setup.py | 22 +- src/headless_pkg/src/headless_node.py | 92 +++++---- 15 files changed, 686 insertions(+), 519 deletions(-) diff --git a/src/anchor_pkg/anchor_pkg/anchor_node.py b/src/anchor_pkg/anchor_pkg/anchor_node.py index e4d1ff5..2445210 100644 --- a/src/anchor_pkg/anchor_pkg/anchor_node.py +++ b/src/anchor_pkg/anchor_pkg/anchor_node.py @@ -38,10 +38,12 @@ Subscribers: * /anchor/to_vic/relay_string - Publish raw strings to this topic to send directly to the MCU for debugging """ + + class SerialRelay(Node): def __init__(self): # Initalize node with name - super().__init__("anchor_node")#previously 'serial_publisher' + super().__init__("anchor_node") # previously 'serial_publisher' # Loop through all serial devices on the computer to check for the MCU self.port = None @@ -55,7 +57,7 @@ class SerialRelay(Node): try: # connect and send a ping command ser = serial.Serial(port, 115200, timeout=1) - #(f"Checking port {port}...") + # (f"Checking port {port}...") ser.write(b"ping\n") response = ser.read_until(bytes("\n", "utf8")) @@ -78,56 +80,74 @@ class SerialRelay(Node): atexit.register(self.cleanup) # New pub/sub with VicCAN - self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20) - self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20) - self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20) - self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20) + self.fromvic_debug_pub_ = self.create_publisher( + String, "/anchor/from_vic/debug", 20 + ) + self.fromvic_core_pub_ = self.create_publisher( + VicCAN, "/anchor/from_vic/core", 20 + ) + self.fromvic_arm_pub_ = self.create_publisher( + VicCAN, "/anchor/from_vic/arm", 20 + ) + self.fromvic_bio_pub_ = self.create_publisher( + VicCAN, "/anchor/from_vic/bio", 20 + ) - self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20) - self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20) - self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20) + self.mock_mcu_sub_ = self.create_subscription( + String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20 + ) + self.tovic_sub_ = self.create_subscription( + VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20 + ) + self.tovic_debug_sub_ = self.create_subscription( + String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20 + ) + # Create publishers + self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10) + self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10) + self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10) - # Create publishers - self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10) - self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10) - self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10) - - self.debug_pub = self.create_publisher(String, '/anchor/debug', 10) - - # Create a subscriber - self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10) + self.debug_pub = self.create_publisher(String, "/anchor/debug", 10) + # Create a subscriber + self.relay_sub = self.create_subscription( + String, "/anchor/relay", self.on_relay_tovic_string, 10 + ) def run(self): # This thread makes all the update processes run in the background global thread thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread.start() - + try: while rclpy.ok(): - self.read_MCU() # Check the MCU for updates + self.read_MCU() # Check the MCU for updates except KeyboardInterrupt: sys.exit(0) def read_MCU(self): - """ Check the USB serial port for new data from the MCU, and publish string to appropriate topics """ + """Check the USB serial port for new data from the MCU, and publish string to appropriate topics""" try: output = str(self.ser.readline(), "utf8") - + if output: self.relay_fromvic(output) # All output over debug temporarily - #self.get_logger().info(f"[MCU] {output}") + # self.get_logger().info(f"[MCU] {output}") msg = String() msg.data = output self.debug_pub.publish(msg) if output.startswith("can_relay_fromvic,core"): self.core_pub.publish(msg) - elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings + elif output.startswith("can_relay_fromvic,arm") or output.startswith( + "can_relay_fromvic,digit" + ): # digit for voltage readings self.arm_pub.publish(msg) - if output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor + if output.startswith("can_relay_fromvic,citadel") or output.startswith( + "can_relay_fromvic,digit" + ): # digit for SHT sensor self.bio_pub.publish(msg) # msg = String() # msg.data = output @@ -152,15 +172,13 @@ class SerialRelay(Node): # self.ser.close() # exit(1) - def on_mock_fromvic(self, msg: String): - """ For testing without an actual MCU, publish strings here as if they came from an MCU """ + """For testing without an actual MCU, publish strings here as if they came from an MCU""" # self.get_logger().info(f"Got command from mock MCU: {msg}") self.relay_fromvic(msg.data) - def on_relay_tovic_viccan(self, msg: VicCAN): - """ Relay a VicCAN message to the MCU """ + """Relay a VicCAN message to the MCU""" output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}" for num in msg.data: output += f",{round(num, 7)}" # limit to 7 decimal places @@ -169,7 +187,7 @@ class SerialRelay(Node): self.ser.write(bytes(output, "utf8")) def relay_fromvic(self, msg: str): - """ Relay a string message from the MCU to the appropriate VicCAN topic """ + """Relay a string message from the MCU to the appropriate VicCAN topic""" self.fromvic_debug_pub_.publish(String(data=msg)) parts = msg.strip().split(",") if len(parts) > 0 and parts[0] != "can_relay_fromvic": @@ -181,11 +199,13 @@ class SerialRelay(Node): malformed_reason: str = "" if len(parts) < 3 or len(parts) > 7: malformed = True - malformed_reason = f"invalid argument count (expected [3,7], got {len(parts)})" + malformed_reason = ( + f"invalid argument count (expected [3,7], got {len(parts)})" + ) elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]: malformed = True malformed_reason = f"invalid mcu_name '{parts[1]}'" - elif not(parts[2].isnumeric()) or int(parts[2]) < 0: + elif not (parts[2].isnumeric()) or int(parts[2]) < 0: malformed = True malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer" else: @@ -198,7 +218,9 @@ class SerialRelay(Node): break if malformed: - self.get_logger().warning(f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}") + self.get_logger().warning( + f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}" + ) return # Have valid VicCAN message @@ -208,7 +230,9 @@ class SerialRelay(Node): output.command_id = int(parts[2]) if len(parts) > 3: output.data = [float(x) for x in parts[3:]] - output.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic") + output.header = Header( + stamp=self.get_clock().now().to_msg(), frame_id="from_vic" + ) # self.get_logger().info(f"Relaying from MCU: {output}") if output.mcu_name == "core": @@ -218,11 +242,10 @@ class SerialRelay(Node): elif output.mcu_name == "citadel" or output.mcu_name == "digit": self.fromvic_bio_pub_.publish(output) - def on_relay_tovic_string(self, msg: String): - """ Relay a raw string message to the MCU for debugging """ + """Relay a raw string message to the MCU for debugging""" message = msg.data - #self.get_logger().info(f"Sending command to MCU: {msg}") + # self.get_logger().info(f"Sending command to MCU: {msg}") self.ser.write(bytes(message, "utf8")) @staticmethod @@ -234,6 +257,7 @@ class SerialRelay(Node): if self.ser.is_open: self.ser.close() + def myexcepthook(type, value, tb): print("Uncaught exception:", type, value) if serial_pub: @@ -249,7 +273,10 @@ def main(args=None): serial_pub = SerialRelay() serial_pub.run() -if __name__ == '__main__': - #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly + +if __name__ == "__main__": + # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly main() diff --git a/src/anchor_pkg/launch/rover.launch.py b/src/anchor_pkg/launch/rover.launch.py index 5b82b82..2a949ba 100644 --- a/src/anchor_pkg/launch/rover.launch.py +++ b/src/anchor_pkg/launch/rover.launch.py @@ -2,115 +2,121 @@ from launch import LaunchDescription from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown -from launch.substitutions import LaunchConfiguration, ThisLaunchFileDir, PathJoinSubstitution +from launch.substitutions import ( + LaunchConfiguration, + ThisLaunchFileDir, + PathJoinSubstitution, +) from launch_ros.actions import Node -#Prevent making __pycache__ directories +# Prevent making __pycache__ directories from sys import dont_write_bytecode + dont_write_bytecode = True + def launch_setup(context, *args, **kwargs): # Retrieve the resolved value of the launch argument 'mode' - mode = LaunchConfiguration('mode').perform(context) + mode = LaunchConfiguration("mode").perform(context) nodes = [] - if mode == 'anchor': + if mode == "anchor": # Launch every node and pass "anchor" as the parameter nodes.append( Node( - package='arm_pkg', - executable='arm', # change as needed - name='arm', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="arm_pkg", + executable="arm", # change as needed + name="arm", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) nodes.append( Node( - package='core_pkg', - executable='core', # change as needed - name='core', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="core_pkg", + executable="core", # change as needed + name="core", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) nodes.append( Node( - package='core_pkg', - executable='ptz', # change as needed - name='ptz', - output='both' + package="core_pkg", + executable="ptz", # change as needed + name="ptz", + output="both", # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure ) ) nodes.append( Node( - package='bio_pkg', - executable='bio', # change as needed - name='bio', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="bio_pkg", + executable="bio", # change as needed + name="bio", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) nodes.append( Node( - package='anchor_pkg', - executable='anchor', # change as needed - name='anchor', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="anchor_pkg", + executable="anchor", # change as needed + name="anchor", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) - elif mode in ['arm', 'core', 'bio', 'ptz']: + elif mode in ["arm", "core", "bio", "ptz"]: # Only launch the node corresponding to the provided mode. - if mode == 'arm': + if mode == "arm": nodes.append( Node( - package='arm_pkg', - executable='arm', - name='arm', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="arm_pkg", + executable="arm", + name="arm", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) - elif mode == 'core': + elif mode == "core": nodes.append( Node( - package='core_pkg', - executable='core', - name='core', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="core_pkg", + executable="core", + name="core", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) - elif mode == 'bio': + elif mode == "bio": nodes.append( Node( - package='bio_pkg', - executable='bio', - name='bio', - output='both', - parameters=[{'launch_mode': mode}], - on_exit=Shutdown() + package="bio_pkg", + executable="bio", + name="bio", + output="both", + parameters=[{"launch_mode": mode}], + on_exit=Shutdown(), ) ) - elif mode == 'ptz': + elif mode == "ptz": nodes.append( Node( - package='core_pkg', - executable='ptz', - name='ptz', - output='both', - on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched + package="core_pkg", + executable="ptz", + name="ptz", + output="both", + on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched ) ) else: @@ -119,14 +125,12 @@ def launch_setup(context, *args, **kwargs): return nodes + def generate_launch_description(): declare_arg = DeclareLaunchArgument( - 'mode', - default_value='anchor', - description='Launch mode: arm, core, bio, anchor, or ptz' + "mode", + default_value="anchor", + description="Launch mode: arm, core, bio, anchor, or ptz", ) - return LaunchDescription([ - declare_arg, - OpaqueFunction(function=launch_setup) - ]) + return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)]) diff --git a/src/anchor_pkg/setup.py b/src/anchor_pkg/setup.py index f29392e..ce8ad5c 100644 --- a/src/anchor_pkg/setup.py +++ b/src/anchor_pkg/setup.py @@ -2,27 +2,24 @@ from setuptools import find_packages, setup from os import path from glob import glob -package_name = 'anchor_pkg' +package_name = "anchor_pkg" setup( name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), + version="0.0.0", + packages=find_packages(exclude=["test"]), data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - (path.join("share", package_name), ['package.xml']), - (path.join("share", package_name, "launch"), glob("launch/*")) + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + (path.join("share", package_name), ["package.xml"]), + (path.join("share", package_name, "launch"), glob("launch/*")), ], - install_requires=['setuptools'], + install_requires=["setuptools"], zip_safe=True, - maintainer='tristan', - maintainer_email='tristanmcginnis26@gmail.com', - description='Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN', - license='All Rights Reserved', + maintainer="tristan", + maintainer_email="tristanmcginnis26@gmail.com", + description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN", + license="All Rights Reserved", entry_points={ - 'console_scripts': [ - "anchor = anchor_pkg.anchor_node:main" - ], + "console_scripts": ["anchor = anchor_pkg.anchor_node:main"], }, ) diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index bd5dd41..a72ce1b 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -64,7 +64,9 @@ class SerialRelay(Node): "Wrist-EF_Roll_Joint", "Gripper_Slider_Left", ] - self.joint_state.position = [0.0] * len(self.joint_state.name) # Initialize with zeros + self.joint_state.position = [0.0] * len( + self.joint_state.name + ) # Initialize with zeros self.joint_command_sub = self.create_subscription( JointState, "/joint_commands", self.joint_command_callback, 10 @@ -233,8 +235,12 @@ class SerialRelay(Node): if len(parts) >= 4: self.digit_feedback.wrist_angle = float(parts[3]) # self.digit_feedback.wrist_roll = float(parts[4]) - self.joint_state.position[4] = math.radians(float(parts[4])) # Wrist roll - self.joint_state.position[5] = math.radians(float(parts[3])) # Wrist yaw + self.joint_state.position[4] = math.radians( + float(parts[4]) + ) # Wrist roll + self.joint_state.position[5] = math.radians( + float(parts[3]) + ) # Wrist yaw else: return diff --git a/src/arm_pkg/setup.py b/src/arm_pkg/setup.py index 5a2a266..f36201b 100644 --- a/src/arm_pkg/setup.py +++ b/src/arm_pkg/setup.py @@ -2,27 +2,26 @@ from setuptools import find_packages, setup import os from glob import glob -package_name = 'arm_pkg' +package_name = "arm_pkg" setup( name=package_name, - version='1.0.0', - packages=find_packages(exclude=['test']), + version="1.0.0", + packages=find_packages(exclude=["test"]), data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']) + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), ], - install_requires=['setuptools'], + install_requires=["setuptools"], zip_safe=True, - maintainer='tristan', - maintainer_email='tristanmcginnis26@gmail.com', - description='TODO: Package description', - license='All Rights Reserved', + maintainer="tristan", + maintainer_email="tristanmcginnis26@gmail.com", + description="TODO: Package description", + license="All Rights Reserved", entry_points={ - 'console_scripts': [ - 'arm = arm_pkg.arm_node:main', - 'headless = arm_pkg.arm_headless:main' + "console_scripts": [ + "arm = arm_pkg.arm_node:main", + "headless = arm_pkg.arm_headless:main", ], }, ) diff --git a/src/bio_pkg/bio_pkg/bio_node.py b/src/bio_pkg/bio_pkg/bio_node.py index 94b4fba..cacce19 100644 --- a/src/bio_pkg/bio_pkg/bio_node.py +++ b/src/bio_pkg/bio_pkg/bio_node.py @@ -15,45 +15,50 @@ from ros2_interfaces_pkg.msg import BioFeedback serial_pub = None thread = None + class SerialRelay(Node): def __init__(self): # Initialize node super().__init__("bio_node") # Get launch mode parameter - self.declare_parameter('launch_mode', 'bio') - self.launch_mode = self.get_parameter('launch_mode').value + self.declare_parameter("launch_mode", "bio") + self.launch_mode = self.get_parameter("launch_mode").value self.get_logger().info(f"bio launch_mode is: {self.launch_mode}") # Create publishers - self.debug_pub = self.create_publisher(String, '/bio/feedback/debug', 10) - self.feedback_pub = self.create_publisher(BioFeedback, '/bio/feedback', 10) - + self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10) + self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10) # Create subscribers - self.control_sub = self.create_subscription(BioControl, '/bio/control', self.send_control, 10) - + self.control_sub = self.create_subscription( + BioControl, "/bio/control", self.send_control, 10 + ) + # Create a publisher for telemetry self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) # Topics used in anchor mode - if self.launch_mode == 'anchor': - self.anchor_sub = self.create_subscription(String, '/anchor/bio/feedback', self.anchor_feedback, 10) - self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) + if self.launch_mode == "anchor": + self.anchor_sub = self.create_subscription( + String, "/anchor/bio/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.bio_feedback = BioFeedback() - # Search for ports IF in 'arm' (standalone) and not 'anchor' mode - if self.launch_mode == 'bio': + if self.launch_mode == "bio": # Loop through all serial devices on the computer to check for the MCU self.port = None for i in range(2): try: # connect and send a ping command - set_port = '/dev/ttyACM0' #MCU is controlled through GPIO pins on the PI + set_port = ( + "/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI + ) ser = serial.Serial(set_port, 115200, timeout=1) - #print(f"Checking port {port}...") + # print(f"Checking port {port}...") ser.write(b"ping\n") response = ser.read_until("\n") @@ -64,12 +69,14 @@ class SerialRelay(Node): break except: pass - + if self.port is None: - self.get_logger().info("Unable to find MCU... please make sure it is connected.") + self.get_logger().info( + "Unable to find MCU... please make sure it is connected." + ) time.sleep(1) sys.exit(1) - + self.ser = serial.Serial(self.port, 115200) atexit.register(self.cleanup) @@ -77,12 +84,12 @@ class SerialRelay(Node): global thread thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread.start() - - #if in arm mode, will need to read from the MCU + + # if in arm mode, will need to read from the MCU try: while rclpy.ok(): - if self.launch_mode == 'bio': + if self.launch_mode == "bio": if self.ser.in_waiting: self.read_mcu() else: @@ -92,8 +99,7 @@ class SerialRelay(Node): finally: self.cleanup() - - #Currently will just spit out all values over the /arm/feedback/debug topic as strings + # Currently will just spit out all values over the /arm/feedback/debug topic as strings def read_mcu(self): try: output = str(self.ser.readline(), "utf8") @@ -123,69 +129,85 @@ class SerialRelay(Node): def send_ik(self, msg): pass - def send_control(self, msg: BioControl): # CITADEL Control Commands ################ - # Chem Pumps, only send if not zero if msg.pump_id != 0: - command = "can_relay_tovic,citadel,27," + str(msg.pump_id) + "," + str(msg.pump_amount) + "\n" + command = ( + "can_relay_tovic,citadel,27," + + str(msg.pump_id) + + "," + + str(msg.pump_amount) + + "\n" + ) self.send_cmd(command) # Fans, only send if not zero if msg.fan_id != 0: - command = "can_relay_tovic,citadel,40," + str(msg.fan_id) + "," + str(msg.fan_duration) + "\n" + command = ( + "can_relay_tovic,citadel,40," + + str(msg.fan_id) + + "," + + str(msg.fan_duration) + + "\n" + ) self.send_cmd(command) # Servos, only send if not zero if msg.servo_id != 0: - command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(int(msg.servo_state)) + "\n" + command = ( + "can_relay_tovic,citadel,25," + + str(msg.servo_id) + + "," + + str(int(msg.servo_state)) + + "\n" + ) self.send_cmd(command) - # LSS (SCYTHE) command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n" - #self.send_cmd(command) - + # self.send_cmd(command) + # Vibration Motor command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n" - #self.send_cmd(command) - + # self.send_cmd(command) - # FAERIE Control Commands + # FAERIE Control Commands ################ - + # To be reviewed before use# # Laser command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n" - #self.send_cmd(command) + # self.send_cmd(command) # Drill (SCABBARD) command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n" - #self.send_cmd(command) + # self.send_cmd(command) # Bio linear actuator command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n" self.send_cmd(command) - - def send_cmd(self, msg: str): - if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay + if ( + self.launch_mode == "anchor" + ): # if in anchor mode, send to anchor node to relay output = String() output.data = msg self.anchor_pub.publish(output) - elif self.launch_mode == 'bio': #if in standalone mode, send to MCU directly + elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly self.get_logger().info(f"[Bio to MCU] {msg}") self.ser.write(bytes(msg, "utf8")) def anchor_feedback(self, msg: String): output = msg.data parts = str(output.strip()).split(",") - #self.get_logger().info(f"[Bio Anchor] {msg.data}") + # self.get_logger().info(f"[Bio Anchor] {msg.data}") - if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100 + if output.startswith( + "can_relay_fromvic,citadel,54" + ): # bat, 12, 5, Voltage readings * 100 self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 self.bio_feedback.voltage_12 = float(parts[4]) / 100.0 self.bio_feedback.voltage_5 = float(parts[5]) / 100.0 @@ -199,7 +221,7 @@ class SerialRelay(Node): @staticmethod def list_serial_ports(): return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - #return glob.glob("/dev/tty[A-Za-z]*") + # return glob.glob("/dev/tty[A-Za-z]*") def cleanup(self): print("Cleaning up...") @@ -209,11 +231,13 @@ class SerialRelay(Node): except Exception as e: exit(0) + def myexcepthook(type, value, tb): print("Uncaught exception:", type, value) if serial_pub: serial_pub.cleanup() + def main(args=None): rclpy.init(args=args) sys.excepthook = myexcepthook @@ -222,7 +246,10 @@ def main(args=None): serial_pub = SerialRelay() serial_pub.run() -if __name__ == '__main__': - #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly + +if __name__ == "__main__": + # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly main() diff --git a/src/bio_pkg/setup.py b/src/bio_pkg/setup.py index 5707e42..d70c631 100644 --- a/src/bio_pkg/setup.py +++ b/src/bio_pkg/setup.py @@ -1,25 +1,22 @@ from setuptools import find_packages, setup -package_name = 'bio_pkg' +package_name = "bio_pkg" setup( name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), + version="0.0.0", + packages=find_packages(exclude=["test"]), data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']), + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), ], - install_requires=['setuptools'], + install_requires=["setuptools"], zip_safe=True, - maintainer='tristan', - maintainer_email='tristanmcginnis26@gmail.com', - description='TODO: Package description', - license='TODO: License declaration', + maintainer="tristan", + maintainer_email="tristanmcginnis26@gmail.com", + description="TODO: Package description", + license="TODO: License declaration", entry_points={ - 'console_scripts': [ - 'bio = bio_pkg.bio_node:main' - ], + "console_scripts": ["bio = bio_pkg.bio_node:main"], }, ) diff --git a/src/core_pkg/core_pkg/core_headless.py b/src/core_pkg/core_pkg/core_headless.py index 797bb3e..f688610 100755 --- a/src/core_pkg/core_pkg/core_headless.py +++ b/src/core_pkg/core_pkg/core_headless.py @@ -17,17 +17,20 @@ from ros2_interfaces_pkg.msg import CoreControl os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display -os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() +os.environ["SDL_AUDIODRIVER"] = ( + "dummy" # Force pygame to use a dummy audio driver before pygame.init() +) -max_speed = 90 #Max speed as a duty cycle percentage (1-100) +max_speed = 90 # Max speed as a duty cycle percentage (1-100) + class Headless(Node): def __init__(self): # Initialize pygame first pygame.init() pygame.joystick.init() - + # Wait for a gamepad to be connected self.gamepad = None print("Waiting for gamepad connection...") @@ -39,23 +42,25 @@ class Headless(Node): sys.exit(0) time.sleep(1.0) # Check every second print("No gamepad found. Waiting...") - + # Initialize the gamepad self.gamepad = pygame.joystick.Joystick(0) self.gamepad.init() - print(f'Gamepad Found: {self.gamepad.get_name()}') - + print(f"Gamepad Found: {self.gamepad.get_name()}") + # Now initialize the ROS2 node super().__init__("core_headless") self.create_timer(0.15, self.send_controls) - self.publisher = self.create_publisher(CoreControl, '/core/control', 10) - self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change + self.publisher = self.create_publisher(CoreControl, "/core/control", 10) + self.lastMsg = ( + String() + ) # Used to ignore sending controls repeatedly when they do not change def run(self): # This thread makes all the update processes run in the background thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread.start() - + try: while rclpy.ok(): self.send_controls() @@ -68,7 +73,7 @@ class Headless(Node): if event.type == pygame.QUIT: pygame.quit() sys.exit(0) - + # Check if controller is still connected if pygame.joystick.get_count() == 0: print("Gamepad disconnected. Exiting...") @@ -82,7 +87,7 @@ class Headless(Node): # Clean up pygame.quit() sys.exit(0) - + input = CoreControl() input.max_speed = max_speed input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis @@ -91,15 +96,17 @@ class Headless(Node): else: input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis - output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' + output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}" self.get_logger().info(f"[Ctrl] {output}") self.publisher.publish(input) + def main(args=None): rclpy.init(args=args) node = Headless() rclpy.spin(node) rclpy.shutdown() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 42c692d..337f2a7 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -38,7 +38,7 @@ control_qos = qos.QoSProfile( deadline=Duration(seconds=1), lifespan=Duration(nanoseconds=500_000_000), # 500ms liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, - liveliness_lease_duration=Duration(seconds=5) + liveliness_lease_duration=Duration(seconds=5), ) # Used to verify the length of an incoming VicCAN feedback message @@ -52,7 +52,7 @@ viccan_msg_len_dict = { 53: 4, 54: 4, 56: 4, # really 3, but viccan - 58: 4 # ditto + 58: 4, # ditto } @@ -62,77 +62,110 @@ class SerialRelay(Node): super().__init__("core_node") # Launch mode -- anchor vs core - self.declare_parameter('launch_mode', 'core') - self.launch_mode = self.get_parameter('launch_mode').value + self.declare_parameter("launch_mode", "core") + self.launch_mode = self.get_parameter("launch_mode").value self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") - ################################################## # Topics # Anchor - if self.launch_mode == 'anchor': - self.anchor_fromvic_sub_ = self.create_subscription(VicCAN, '/anchor/from_vic/core', self.relay_fromvic, 20) - self.anchor_tovic_pub_ = self.create_publisher(VicCAN, '/anchor/to_vic/relay', 20) + if self.launch_mode == "anchor": + self.anchor_fromvic_sub_ = self.create_subscription( + VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20 + ) + self.anchor_tovic_pub_ = self.create_publisher( + VicCAN, "/anchor/to_vic/relay", 20 + ) - self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) - self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) + self.anchor_sub = self.create_subscription( + String, "/anchor/core/feedback", self.anchor_feedback, 10 + ) + self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) # Control # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 - self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, 1) + self.cmd_vel_sub_ = self.create_subscription( + TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1 + ) # manual twist -- [-1, 1] rather than real units - self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) + self.twist_man_sub_ = self.create_subscription( + Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos + ) # manual flags -- brake mode and max duty cycle - self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos) - self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 + self.control_state_sub_ = self.create_subscription( + CoreCtrlState, + "/core/control/state", + self.control_state_callback, + qos_profile=control_qos, + ) + self.twist_max_duty = ( + 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 + ) # Feedback # Consolidated and organized core feedback - self.feedback_new_pub_ = self.create_publisher(NewCoreFeedback, '/core/feedback_new', qos_profile=qos.qos_profile_sensor_data) + self.feedback_new_pub_ = self.create_publisher( + NewCoreFeedback, + "/core/feedback_new", + qos_profile=qos.qos_profile_sensor_data, + ) self.feedback_new_state = NewCoreFeedback() self.feedback_new_state.fl_motor.id = 1 self.feedback_new_state.bl_motor.id = 2 self.feedback_new_state.fr_motor.id = 3 self.feedback_new_state.br_motor.id = 4 - self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) # TODO: not sure about this + self.telemetry_pub_timer = self.create_timer( + 1.0, self.publish_feedback + ) # TODO: not sure about this # Joint states for topic-based controller - self.joint_state_pub_ = self.create_publisher(JointState, '/core/joint_states', qos_profile=qos.qos_profile_sensor_data) + self.joint_state_pub_ = self.create_publisher( + JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data + ) # IMU (embedded BNO-055) - self.imu_pub_ = self.create_publisher(Imu, '/core/imu', qos_profile=qos.qos_profile_sensor_data) + self.imu_pub_ = self.create_publisher( + Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data + ) self.imu_state = Imu() self.imu_state.header.frame_id = "core_bno055" # GPS (embedded u-blox M9N) - self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', qos_profile=qos.qos_profile_sensor_data) + self.gps_pub_ = self.create_publisher( + NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data + ) self.gps_state = NavSatFix() self.gps_state.header.frame_id = "core_gps_antenna" self.gps_state.status.service = NavSatStatus.SERVICE_GPS self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN # Barometer (embedded BMP-388) - self.baro_pub_ = self.create_publisher(Barometer, '/core/baro', qos_profile=qos.qos_profile_sensor_data) + self.baro_pub_ = self.create_publisher( + Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data + ) self.baro_state = Barometer() self.baro_state.header.frame_id = "core_bmp388" # Old - + # /core/control - self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff + self.control_sub = self.create_subscription( + CoreControl, "/core/control", self.send_controls, 10 + ) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff # /core/feedback - self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) + self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10) self.core_feedback = CoreFeedback() # Debug - self.debug_pub = self.create_publisher(String, '/core/debug', 10) - self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) - + self.debug_pub = self.create_publisher(String, "/core/debug", 10) + self.ping_service = self.create_service( + Empty, "/astra/core/ping", self.ping_callback + ) ################################################## # Find microcontroller (Non-anchor only) # Core (non-anchor) specific - if self.launch_mode == 'core': + if self.launch_mode == "core": # Loop through all serial devices on the computer to check for the MCU self.port = None ports = SerialRelay.list_serial_ports() @@ -141,7 +174,7 @@ class SerialRelay(Node): try: # connect and send a ping command ser = serial.Serial(port, 115200, timeout=1) - #(f"Checking port {port}...") + # (f"Checking port {port}...") ser.write(b"ping\n") response = ser.read_until("\n") # type: ignore @@ -156,17 +189,16 @@ class SerialRelay(Node): pass if self.port is not None: break - + if self.port is None: self.get_logger().info("Unable to find MCU...") time.sleep(1) sys.exit(1) - + self.ser = serial.Serial(self.port, 115200) atexit.register(self.cleanup) # end __init__() - def run(self): # This thread makes all the update processes run in the background global thread @@ -175,15 +207,15 @@ class SerialRelay(Node): try: while rclpy.ok(): - if self.launch_mode == 'core': - self.read_MCU() # Check the MCU for updates + if self.launch_mode == "core": + self.read_MCU() # Check the MCU for updates except KeyboardInterrupt: sys.exit(0) def read_MCU(self): # NON-ANCHOR SPECIFIC try: output = str(self.ser.readline(), "utf8") - + if output: # All output over debug temporarily print(f"[MCU] {output}") @@ -213,8 +245,8 @@ class SerialRelay(Node): def scale_duty(self, value: float, max_speed: float): leftMin = -1 leftMax = 1 - rightMin = -max_speed/100.0 - rightMax = max_speed/100.0 + rightMin = -max_speed / 100.0 + rightMax = max_speed / 100.0 # Figure out how 'wide' each range is leftSpan = leftMax - leftMin @@ -227,17 +259,29 @@ class SerialRelay(Node): return str(rightMin + (valueScaled * rightSpan)) def send_controls(self, msg: CoreControl): - if(msg.turn_to_enable): - command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' + if msg.turn_to_enable: + command = ( + "can_relay_tovic,core,41," + + str(msg.turn_to) + + "," + + str(msg.turn_to_timeout) + + "\n" + ) else: - command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n' + command = ( + "can_relay_tovic,core,19," + + self.scale_duty(msg.left_stick, msg.max_speed) + + "," + + self.scale_duty(msg.right_stick, msg.max_speed) + + "\n" + ) self.send_cmd(command) # Brake mode - command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n' + command = "can_relay_tovic,core,18," + str(int(msg.brake)) + "\n" self.send_cmd(command) - #print(f"[Sys] Relaying: {command}") + # print(f"[Sys] Relaying: {command}") def cmd_vel_callback(self, msg: TwistStamped): linear = msg.twist.linear.x @@ -248,20 +292,22 @@ class SerialRelay(Node): vel_left_rpm = round((vel_left_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO vel_right_rpm = round((vel_right_rads * 60) / (2 * 3.14159)) * CORE_GEAR_RATIO - + self.send_viccan(20, [vel_left_rpm, vel_right_rpm]) def twist_man_callback(self, msg: Twist): linear = msg.linear.x # [-1 1] for forward/back from left stick y angular = msg.angular.z # [-1 1] for left/right from right stick x - if (linear < 0): # reverse turning direction when going backwards (WIP) + if linear < 0: # reverse turning direction when going backwards (WIP) angular *= -1 if abs(linear) > 1 or abs(angular) > 1: # if speed is greater than 1, then there is a problem # make it look like a problem and don't just run away lmao - linear = copysign(0.25, linear) # 0.25 duty cycle in direction of control (hopefully slow) + linear = copysign( + 0.25, linear + ) # 0.25 duty cycle in direction of control (hopefully slow) angular = copysign(0.25, angular) duty_left = linear - angular @@ -272,8 +318,12 @@ class SerialRelay(Node): # Apply max duty cycle # Joysticks provide values [-1, 1] rather than real units - duty_left = map_range(duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty) - duty_right = map_range(duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty) + duty_left = map_range( + duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty + ) + duty_right = map_range( + duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty + ) self.send_viccan(19, [duty_left, duty_right]) @@ -285,24 +335,24 @@ class SerialRelay(Node): self.twist_max_duty = msg.max_duty # twist_man_callback will handle this def send_cmd(self, msg: str): - if self.launch_mode == 'anchor': - #self.get_logger().info(f"[Core to Anchor Relay] {msg}") - output = String()#Convert to std_msg string + if self.launch_mode == "anchor": + # self.get_logger().info(f"[Core to Anchor Relay] {msg}") + output = String() # Convert to std_msg string output.data = msg self.anchor_pub.publish(output) - elif self.launch_mode == 'core': + elif self.launch_mode == "core": self.get_logger().info(f"[Core to MCU] {msg}") self.ser.write(bytes(msg, "utf8")) - def send_viccan(self, cmd_id: int, data: list[float]): - self.anchor_tovic_pub_.publish(VicCAN( - header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"), - mcu_name="core", - command_id=cmd_id, - data=data - )) - + self.anchor_tovic_pub_.publish( + VicCAN( + header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"), + mcu_name="core", + command_id=cmd_id, + data=data, + ) + ) def anchor_feedback(self, msg: String): output = msg.data @@ -366,7 +416,7 @@ class SerialRelay(Node): return self.feedback_new_state.header.stamp = self.get_clock().now().to_msg() self.feedback_new_pub_.publish(self.feedback_new_state) - #self.get_logger().info(f"[Core Anchor] {msg}") + # self.get_logger().info(f"[Core Anchor] {msg}") def relay_fromvic(self, msg: VicCAN): # Assume that the message is coming from Core @@ -376,7 +426,9 @@ class SerialRelay(Node): if msg.command_id in viccan_msg_len_dict: expected_len = viccan_msg_len_dict[msg.command_id] if len(msg.data) != expected_len: - self.get_logger().warning(f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})") + self.get_logger().warning( + f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})" + ) return match msg.command_id: @@ -386,7 +438,11 @@ class SerialRelay(Node): case 49: # GNSS Longitude self.gps_state.longitude = float(msg.data[0]) case 50: # GNSS Satellite count and altitude - self.gps_state.status.status = NavSatStatus.STATUS_FIX if int(msg.data[0]) >= 3 else NavSatStatus.STATUS_NO_FIX + self.gps_state.status.status = ( + NavSatStatus.STATUS_FIX + if int(msg.data[0]) >= 3 + else NavSatStatus.STATUS_NO_FIX + ) self.gps_state.altitude = float(msg.data[1]) self.gps_state.header.stamp = msg.header.stamp self.gps_pub_.publish(self.gps_state) @@ -402,7 +458,7 @@ class SerialRelay(Node): self.imu_state.linear_acceleration.y = float(msg.data[1]) self.imu_state.linear_acceleration.z = float(msg.data[2]) # Deal with quaternion - r = Rotation.from_euler('z', float(msg.data[3]), degrees=True) + r = Rotation.from_euler("z", float(msg.data[3]), degrees=True) q = r.as_quat() self.imu_state.orientation.x = q[0] self.imu_state.orientation.y = q[1] @@ -427,7 +483,9 @@ class SerialRelay(Node): case 4: motor = self.feedback_new_state.br_motor case _: - self.get_logger().warning(f"Ignoring REV motor feedback 53 with invalid motorId {motorId}") + self.get_logger().warning( + f"Ignoring REV motor feedback 53 with invalid motorId {motorId}" + ) return if motor: @@ -455,9 +513,15 @@ class SerialRelay(Node): motorId = round(float(msg.data[0])) position = float(msg.data[1]) velocity = float(msg.data[2]) - joint_state_msg = JointState() # TODO: not sure if all motors should be in each message or not - joint_state_msg.position = [position * (2 * pi) / CORE_GEAR_RATIO] # revolutions to radians - joint_state_msg.velocity = [velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO] # RPM to rad/s + joint_state_msg = ( + JointState() + ) # TODO: not sure if all motors should be in each message or not + joint_state_msg.position = [ + position * (2 * pi) / CORE_GEAR_RATIO + ] # revolutions to radians + joint_state_msg.velocity = [ + velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO + ] # RPM to rad/s motor: RevMotorState | None = None @@ -475,7 +539,9 @@ class SerialRelay(Node): motor = self.feedback_new_state.br_motor joint_state_msg.name = ["br_motor_joint"] case _: - self.get_logger().warning(f"Ignoring REV motor feedback 58 with invalid motorId {motorId}") + self.get_logger().warning( + f"Ignoring REV motor feedback 58 with invalid motorId {motorId}" + ) return joint_state_msg.header.stamp = msg.header.stamp @@ -483,19 +549,17 @@ class SerialRelay(Node): case _: return - def publish_feedback(self): - #self.get_logger().info(f"[Core] {self.core_feedback}") + # self.get_logger().info(f"[Core] {self.core_feedback}") self.feedback_pub.publish(self.core_feedback) def ping_callback(self, request, response): return response - @staticmethod def list_serial_ports(): return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") - + def cleanup(self): print("Cleaning up before terminating...") try: @@ -510,20 +574,26 @@ def myexcepthook(type, value, tb): if serial_pub: serial_pub.cleanup() -def map_range(value: float, in_min: float, in_max: float, out_min: float, out_max: float): + +def map_range( + value: float, in_min: float, in_max: float, out_min: float, out_max: float +): return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min def main(args=None): - rclpy.init(args=args) - sys.excepthook = myexcepthook + rclpy.init(args=args) + sys.excepthook = myexcepthook - global serial_pub + global serial_pub - serial_pub = SerialRelay() - serial_pub.run() + serial_pub = SerialRelay() + serial_pub.run() -if __name__ == '__main__': - #signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly + +if __name__ == "__main__": + # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly main() diff --git a/src/core_pkg/core_pkg/core_ptz.py b/src/core_pkg/core_pkg/core_ptz.py index 15df4ec..93cb4f6 100644 --- a/src/core_pkg/core_pkg/core_ptz.py +++ b/src/core_pkg/core_pkg/core_ptz.py @@ -18,55 +18,63 @@ from core_pkg.siyi_sdk import ( DataStreamType, DataStreamFrequency, SingleAxis, - AttitudeData + AttitudeData, ) + class PtzNode(Node): def __init__(self): # Initialize node with name super().__init__("core_ptz") # Declare parameters - self.declare_parameter('camera_ip', '192.168.1.9') - self.declare_parameter('camera_port', 37260) - + self.declare_parameter("camera_ip", "192.168.1.9") + self.declare_parameter("camera_port", 37260) + # Get parameters - self.camera_ip = self.get_parameter('camera_ip').value - self.camera_port = self.get_parameter('camera_port').value - - self.get_logger().info(f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}") - + self.camera_ip = self.get_parameter("camera_ip").value + self.camera_port = self.get_parameter("camera_port").value + + self.get_logger().info( + f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}" + ) + # Create a camera instance self.camera = None - self.camera_connected = False # This flag is still managed but not used to gate commands + self.camera_connected = ( + False # This flag is still managed but not used to gate commands + ) self.loop = None self.thread_pool = None - + # Create publishers - self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) - self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) - + self.feedback_pub = self.create_publisher(PtzFeedback, "/ptz/feedback", 10) + self.debug_pub = self.create_publisher(String, "/ptz/debug", 10) + # Create subscribers self.control_sub = self.create_subscription( - PtzControl, '/ptz/control', self.handle_control_command, 10) - + PtzControl, "/ptz/control", self.handle_control_command, 10 + ) + # Create timers self.connection_timer = self.create_timer(5.0, self.check_camera_connection) self.last_data_time = time.time() self.health_check_timer = self.create_timer(2.0, self.check_camera_health) - + # Create feedback message self.feedback_msg = PtzFeedback() - self.feedback_msg.connected = False # This will reflect the actual connection state + self.feedback_msg.connected = ( + False # This will reflect the actual connection state + ) self.feedback_msg.error_msg = "Initializing" - + # Flags for async operations self.shutdown_requested = False - + # Set up asyncio event loop in a separate thread self.thread_pool = ThreadPoolExecutor(max_workers=1) self.loop = asyncio.new_event_loop() - + # Connect to camera on startup self.connect_task = self.thread_pool.submit( self.run_async_func, self.connect_to_camera() @@ -77,28 +85,27 @@ class PtzNode(Node): try: # Create a new camera instance self.camera = SiyiGimbalCamera(ip=self.camera_ip, port=self.camera_port) - + # Connect to the camera await self.camera.connect() - + # Set up data callback self.camera.set_data_callback(self.camera_data_callback) - + # Request attitude data stream await self.camera.send_data_stream_request( - DataStreamType.ATTITUDE_DATA, - DataStreamFrequency.HZ_10 + DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_10 ) - + # Update connection status self.camera_connected = True self.feedback_msg.connected = True self.feedback_msg.error_msg = "" - + self.publish_debug("Camera connected successfully") - + except Exception as e: - self.camera_connected = False + self.camera_connected = False self.feedback_msg.connected = False self.feedback_msg.error_msg = f"Connection error: {str(e)}" self.publish_debug(f"Camera connection failed: {str(e)}") @@ -108,8 +115,12 @@ class PtzNode(Node): # Update last_data_time regardless of self.camera_connected, # as data might arrive during a brief reconnect window. self.last_data_time = time.time() - if self.camera_connected: # Only process for feedback if we believe we are connected - if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): + if ( + self.camera_connected + ): # Only process for feedback if we believe we are connected + if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance( + data, AttitudeData + ): self.feedback_msg.yaw = data.yaw self.feedback_msg.pitch = data.pitch self.feedback_msg.roll = data.roll @@ -123,36 +134,39 @@ class PtzNode(Node): debug_str = f"Camera data: CMD_ID={cmd_id.name}, Data=" else: debug_str = f"Camera data: CMD_ID={cmd_id}, Data=" - + if isinstance(data, bytes): debug_str += data.hex() else: debug_str += str(data) self.get_logger().debug(debug_str) - def check_camera_connection(self): """Periodically check camera connection and attempt to reconnect if needed.""" if not self.camera_connected and not self.shutdown_requested: self.publish_debug("Attempting to reconnect to camera...") if self.camera: try: - if self.camera.is_connected: # SDK's internal connection state - self.run_async_func(self.camera.disconnect()) + if self.camera.is_connected: # SDK's internal connection state + self.run_async_func(self.camera.disconnect()) except Exception as e: - self.get_logger().debug(f"Error during pre-reconnect disconnect: {e}") + self.get_logger().debug( + f"Error during pre-reconnect disconnect: {e}" + ) # self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new - + self.connect_task = self.thread_pool.submit( self.run_async_func, self.connect_to_camera() ) def check_camera_health(self): """Check if we're still receiving data from the camera""" - if self.camera_connected: # Only check health if we think we are connected + if self.camera_connected: # Only check health if we think we are connected time_since_last_data = time.time() - self.last_data_time if time_since_last_data > 5.0: - self.publish_debug(f"No camera data for {time_since_last_data:.1f}s, marking as disconnected.") + self.publish_debug( + f"No camera data for {time_since_last_data:.1f}s, marking as disconnected." + ) self.camera_connected = False self.feedback_msg.connected = False self.feedback_msg.error_msg = "Connection stale (no data)" @@ -161,19 +175,20 @@ class PtzNode(Node): def handle_control_command(self, msg): """Handle incoming control commands.""" # Removed: if not self.camera_connected - if not self.camera: # Still check if camera object exists - self.get_logger().warning("Camera object not initialized, ignoring control command") + if not self.camera: # Still check if camera object exists + self.get_logger().warning( + "Camera object not initialized, ignoring control command" + ) return - - self.thread_pool.submit( - self.run_async_func, - self.process_control_command(msg) - ) + + self.thread_pool.submit(self.run_async_func, self.process_control_command(msg)) async def process_control_command(self, msg): """Process and send the control command to the camera.""" - if not self.camera: - self.get_logger().error("Process control command called but camera object is None.") + if not self.camera: + self.get_logger().error( + "Process control command called but camera object is None." + ) return try: # The SDK's send_... methods will raise RuntimeError if not connected. @@ -182,43 +197,55 @@ class PtzNode(Node): self.get_logger().info("Attempting to reset camera to center position") await self.camera.send_attitude_angles_command(0.0, 0.0) return - + if msg.control_mode == 0: turn_yaw = max(-100, min(100, int(msg.turn_yaw))) turn_pitch = max(-100, min(100, int(msg.turn_pitch))) - self.get_logger().debug(f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}") + self.get_logger().debug( + f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}" + ) await self.camera.send_rotation_command(turn_yaw, turn_pitch) - + elif msg.control_mode == 1: yaw = max(-135.0, min(135.0, msg.yaw)) pitch = max(-90.0, min(90.0, msg.pitch)) - self.get_logger().debug(f"Attempting absolute angles: yaw={yaw}, pitch={pitch}") + self.get_logger().debug( + f"Attempting absolute angles: yaw={yaw}, pitch={pitch}" + ) await self.camera.send_attitude_angles_command(yaw, pitch) - + elif msg.control_mode == 2: axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH angle = msg.angle - self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}") + self.get_logger().debug( + f"Attempting single axis: axis={axis.name}, angle={angle}" + ) await self.camera.send_single_axis_attitude_command(angle, axis) - elif msg.control_mode == 3: + elif msg.control_mode == 3: zoom_level = msg.zoom_level - self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x") + self.get_logger().debug( + f"Attempting absolute zoom: level={zoom_level}x" + ) await self.camera.send_absolute_zoom_command(zoom_level) - - if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'): - if msg.stream_type > 0 and msg.stream_freq >= 0: + + if hasattr(msg, "stream_type") and hasattr(msg, "stream_freq"): + if msg.stream_type > 0 and msg.stream_freq >= 0: try: stream_type = DataStreamType(msg.stream_type) stream_freq = DataStreamFrequency(msg.stream_freq) self.get_logger().info( f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}" ) - await self.camera.send_data_stream_request(stream_type, stream_freq) + await self.camera.send_data_stream_request( + stream_type, stream_freq + ) except ValueError: - self.get_logger().error("Invalid stream type or frequency values in control message") - - except RuntimeError as e: # Catch SDK's "not connected" errors + self.get_logger().error( + "Invalid stream type or frequency values in control message" + ) + + except RuntimeError as e: # Catch SDK's "not connected" errors self.get_logger().warning(f"SDK command failed (likely not connected): {e}") # self.camera_connected will be updated by health/connection checks # self.feedback_msg.error_msg = f"Command failed: {str(e)}" # Already set by health check @@ -226,58 +253,64 @@ class PtzNode(Node): except Exception as e: self.get_logger().error(f"Error processing control command: {e}") self.feedback_msg.error_msg = f"Control error: {str(e)}" - self.feedback_pub.publish(self.feedback_msg) # Publish for other errors + self.feedback_pub.publish(self.feedback_msg) # Publish for other errors def publish_debug(self, message_text): """Publish debug message.""" msg = String() - msg.data = f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" + msg.data = ( + f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" + ) self.debug_pub.publish(msg) - self.get_logger().info(message_text) + self.get_logger().info(message_text) def run_async_func(self, coro): """Run an async function in the event loop.""" if self.loop and self.loop.is_running(): try: - return asyncio.run_coroutine_threadsafe(coro, self.loop).result(timeout=5.0) # Added timeout + return asyncio.run_coroutine_threadsafe(coro, self.loop).result( + timeout=5.0 + ) # Added timeout except asyncio.TimeoutError: self.get_logger().warning(f"Async function {coro.__name__} timed out.") return None except Exception as e: - self.get_logger().error(f"Exception in run_async_func for {coro.__name__}: {e}") + self.get_logger().error( + f"Exception in run_async_func for {coro.__name__}: {e}" + ) return None else: - self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.") + self.get_logger().warning( + "Asyncio loop not running, cannot execute coroutine." + ) return None - async def shutdown_node_async(self): """Perform clean shutdown of camera connection.""" self.shutdown_requested = True self.get_logger().info("Async shutdown initiated...") - if self.camera and self.camera.is_connected: # Check SDK's connection state + if self.camera and self.camera.is_connected: # Check SDK's connection state try: self.get_logger().info("Disabling data stream...") await self.camera.send_data_stream_request( - DataStreamType.ATTITUDE_DATA, - DataStreamFrequency.DISABLE + DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE ) - await asyncio.sleep(0.1) - + await asyncio.sleep(0.1) + self.get_logger().info("Disconnecting from camera...") await self.camera.disconnect() self.get_logger().info("Disconnected from camera successfully.") - + except Exception as e: self.get_logger().error(f"Error during camera shutdown: {e}") - self.camera_connected = False # Update node's flag + self.camera_connected = False # Update node's flag self.feedback_msg.connected = False self.feedback_msg.error_msg = "Shutting down" def cleanup(self): """Clean up resources.""" self.get_logger().info("PTZ node cleanup initiated.") - self.shutdown_requested = True + self.shutdown_requested = True if self.connection_timer: self.connection_timer.cancel() @@ -287,31 +320,38 @@ class PtzNode(Node): if self.loop and self.thread_pool: if self.loop.is_running(): try: - future = asyncio.run_coroutine_threadsafe(self.shutdown_node_async(), self.loop) - future.result(timeout=5) + future = asyncio.run_coroutine_threadsafe( + self.shutdown_node_async(), self.loop + ) + future.result(timeout=5) except Exception as e: - self.get_logger().error(f"Error during async shutdown in cleanup: {e}") - + self.get_logger().error( + f"Error during async shutdown in cleanup: {e}" + ) + self.get_logger().info("Shutting down thread pool executor...") self.thread_pool.shutdown(wait=True) - + if self.loop.is_running(): self.get_logger().info("Stopping asyncio event loop...") self.loop.call_soon_threadsafe(self.loop.stop) - + self.get_logger().info("PTZ node resources cleaned up.") else: - self.get_logger().warning("Loop or thread_pool not initialized, skipping parts of cleanup.") + self.get_logger().warning( + "Loop or thread_pool not initialized, skipping parts of cleanup." + ) def main(args=None): """Main function.""" rclpy.init(args=args) - + ptz_node = PtzNode() - + asyncio_thread = None - if ptz_node.loop: + if ptz_node.loop: + def run_event_loop(loop): asyncio.set_event_loop(loop) try: @@ -322,14 +362,12 @@ def main(args=None): # or an unhandled exception within a task scheduled on the loop. if not loop.is_closed(): loop.close() - + asyncio_thread = threading.Thread( - target=run_event_loop, - args=(ptz_node.loop,), - daemon=True + target=run_event_loop, args=(ptz_node.loop,), daemon=True ) asyncio_thread.start() - + try: rclpy.spin(ptz_node) except KeyboardInterrupt: @@ -338,18 +376,18 @@ def main(args=None): ptz_node.get_logger().info("SystemExit received, shutting down...") finally: ptz_node.get_logger().info("Initiating final cleanup...") - ptz_node.cleanup() # This will stop the loop and shutdown the executor - + ptz_node.cleanup() # This will stop the loop and shutdown the executor + if asyncio_thread and asyncio_thread.is_alive(): - # The loop should have been stopped by cleanup. We just join the thread. - ptz_node.get_logger().info("Waiting for asyncio thread to join...") - asyncio_thread.join(timeout=5) - if asyncio_thread.is_alive(): - ptz_node.get_logger().warning("Asyncio thread did not join cleanly.") - + # The loop should have been stopped by cleanup. We just join the thread. + ptz_node.get_logger().info("Waiting for asyncio thread to join...") + asyncio_thread.join(timeout=5) + if asyncio_thread.is_alive(): + ptz_node.get_logger().warning("Asyncio thread did not join cleanly.") + rclpy.shutdown() ptz_node.get_logger().info("ROS shutdown complete.") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/core_pkg/core_pkg/siyi_sdk/siyi.py b/src/core_pkg/core_pkg/siyi_sdk/siyi.py index a5e426a..b0a63da 100644 --- a/src/core_pkg/core_pkg/siyi_sdk/siyi.py +++ b/src/core_pkg/core_pkg/siyi_sdk/siyi.py @@ -110,9 +110,7 @@ class SiyiGimbalCamera: MAX_A8_MINI_ZOOM = 6.0 # Maximum zoom for A8 mini - def __init__( - self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2 - ): + def __init__(self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2): self.ip = ip self.port = port self.heartbeat_interval = heartbeat_interval @@ -124,9 +122,7 @@ class SiyiGimbalCamera: async def connect(self) -> None: try: - self.reader, self.writer = await asyncio.open_connection( - self.ip, self.port - ) + self.reader, self.writer = await asyncio.open_connection(self.ip, self.port) self.is_connected = True asyncio.create_task(self.heartbeat_loop()) asyncio.create_task(self._data_stream_listener()) @@ -158,9 +154,7 @@ class SiyiGimbalCamera: if self.is_connected: await self.disconnect() - def _build_packet_header( - self, cmd_id: CommandID, data_len: int - ) -> bytearray: + def _build_packet_header(self, cmd_id: CommandID, data_len: int) -> bytearray: """Helper to build the common packet header.""" packet = bytearray() packet.extend(b"\x55\x66") # STX @@ -179,15 +173,11 @@ class SiyiGimbalCamera: def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: data_len = 2 - packet = self._build_packet_header( - CommandID.ROTATION_CONTROL, data_len - ) + packet = self._build_packet_header(CommandID.ROTATION_CONTROL, data_len) packet.extend(struct.pack("bb", turn_yaw, turn_pitch)) return self._finalize_packet(packet) - async def send_rotation_command( - self, turn_yaw: int, turn_pitch: int - ) -> None: + async def send_rotation_command(self, turn_yaw: int, turn_pitch: int) -> None: if not self.is_connected or not self.writer: raise RuntimeError( "Socket is not connected or writer is None, cannot send rotation command." @@ -199,21 +189,15 @@ class SiyiGimbalCamera: f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}" ) - def _build_attitude_angles_packet( - self, yaw: float, pitch: float - ) -> bytes: + def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes: data_len = 4 - packet = self._build_packet_header( - CommandID.ATTITUDE_ANGLES, data_len - ) + packet = self._build_packet_header(CommandID.ATTITUDE_ANGLES, data_len) yaw_int = int(round(yaw * 10)) pitch_int = int(round(pitch * 10)) packet.extend(struct.pack(" None: + async def send_attitude_angles_command(self, yaw: float, pitch: float) -> None: if not self.is_connected or not self.writer: raise RuntimeError( "Socket is not connected or writer is None, cannot send attitude angles command." @@ -221,17 +205,13 @@ class SiyiGimbalCamera: packet = self._build_attitude_angles_packet(yaw, pitch) self.writer.write(packet) await self.writer.drain() - logger.debug( - f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°" - ) + logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°") def _build_single_axis_attitude_packet( self, angle: float, axis: SingleAxis ) -> bytes: data_len = 3 - packet = self._build_packet_header( - CommandID.SINGLE_AXIS_CONTROL, data_len - ) + packet = self._build_packet_header(CommandID.SINGLE_AXIS_CONTROL, data_len) angle_int = int(round(angle * 10)) packet.extend(struct.pack(" bytes: data_len = 2 - packet = self._build_packet_header( - CommandID.DATA_STREAM_REQUEST, data_len - ) + packet = self._build_packet_header(CommandID.DATA_STREAM_REQUEST, data_len) packet.append(data_type.value) packet.append(data_freq.value) return self._finalize_packet(packet) @@ -279,7 +257,9 @@ class SiyiGimbalCamera: data_len = 2 packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len) zoom_packet_value = int(round(zoom_level * 10)) - if not (0 <= zoom_packet_value <= 65535): # Should be caught by clamping earlier + if not ( + 0 <= zoom_packet_value <= 65535 + ): # Should be caught by clamping earlier raise ValueError( "Zoom packet value out of uint16_t range after conversion." ) @@ -329,24 +309,24 @@ class SiyiGimbalCamera: ctrl = await self.reader.readexactly(1) data_len_bytes = await self.reader.readexactly(2) data_len = struct.unpack(" 2048: # Arbitrary reasonable limit + if data_len > 2048: # Arbitrary reasonable limit raise ValueError(f"Excessive data length received: {data_len}") data = await self.reader.readexactly(data_len) crc_bytes = await self.reader.readexactly(2) received_crc = struct.unpack(" 0 and right_trigger > 0: arm_input.gripper = 0 @@ -237,7 +244,6 @@ class Headless(Node): elif dpad_input[0] == -1: arm_input.axis0 = -1 - if right_bumper: # Control end effector # Effector yaw @@ -252,30 +258,31 @@ class Headless(Node): elif right_stick_x < 0: arm_input.effector_roll = -1 - else: # Control arm axis + else: # Control arm axis # Axis 1 - if abs(left_stick_x) > .15: + if abs(left_stick_x) > 0.15: arm_input.axis1 = round(left_stick_x) # Axis 2 - if abs(left_stick_y) > .15: + if abs(left_stick_y) > 0.15: arm_input.axis2 = -1 * round(left_stick_y) # Axis 3 - if abs(right_stick_y) > .15: + if abs(right_stick_y) > 0.15: arm_input.axis3 = -1 * round(right_stick_y) - # BIO bio_input = BioControl( bio_arm=int(left_stick_y * -100), - drill_arm=int(round(right_stick_y) * -100) + drill_arm=int(round(right_stick_y) * -100), ) # Drill motor (FAERIE) if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0: - bio_input.drill = int(30 * (right_trigger - left_trigger)) # Max duty cycle 30% + bio_input.drill = int( + 30 * (right_trigger - left_trigger) + ) # Max duty cycle 30% self.core_publisher.publish(CORE_STOP_MSG) self.arm_publisher.publish(arm_input) @@ -283,7 +290,7 @@ class Headless(Node): def deadzone(value: float, threshold=0.05) -> float: - """ Apply a deadzone to a joystick input so the motors don't sound angry """ + """Apply a deadzone to a joystick input so the motors don't sound angry""" if abs(value) < threshold: return 0 return value @@ -295,6 +302,9 @@ def main(args=None): rclpy.spin(node) rclpy.shutdown() -if __name__ == '__main__': - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly + +if __name__ == "__main__": + signal.signal( + signal.SIGTERM, lambda signum, frame: sys.exit(0) + ) # Catch termination signals and exit cleanly main()