From 7a0d73394d103cb798bd15540f0dff58fd610b27 Mon Sep 17 00:00:00 2001 From: Tristan McGinnis Date: Wed, 19 Feb 2025 22:49:15 -0600 Subject: [PATCH] add launch mode logic --- launch/rover_launch.py | 14 +- src/arm_pkg/arm_pkg/arm_node.py | 228 +++++++++-------------------- src/core_pkg/core_pkg/core_node.py | 82 +++++++---- 3 files changed, 125 insertions(+), 199 deletions(-) diff --git a/launch/rover_launch.py b/launch/rover_launch.py index 2e2a759..95bf004 100644 --- a/launch/rover_launch.py +++ b/launch/rover_launch.py @@ -18,7 +18,7 @@ def launch_setup(context, *args, **kwargs): executable='arm', # change as needed name='arm', output='screen', - parameters=[{'launch_arg': mode}] + parameters=[{'launch_mode': mode}] ) ) nodes.append( @@ -27,7 +27,7 @@ def launch_setup(context, *args, **kwargs): executable='core', # change as needed name='core', output='screen', - parameters=[{'launch_arg': mode}] + parameters=[{'launch_mode': mode}] ) ) # nodes.append( @@ -36,7 +36,7 @@ def launch_setup(context, *args, **kwargs): # executable='bio', # change as needed # name='bio', # output='screen', - # parameters=[{'launch_arg': mode}] + # parameters=[{'launch_mode': mode}] # ) # ) nodes.append( @@ -45,7 +45,7 @@ def launch_setup(context, *args, **kwargs): executable='anchor', # change as needed name='anchor', output='screen', - parameters=[{'launch_arg': mode}] + parameters=[{'launch_mode': mode}] ) ) elif mode in ['arm', 'core', 'bio']: @@ -57,7 +57,7 @@ def launch_setup(context, *args, **kwargs): executable='arm', name='arm', output='screen', - parameters=[{'launch_arg': mode}] + parameters=[{'launch_mode': mode}] ) ) elif mode == 'core': @@ -67,7 +67,7 @@ def launch_setup(context, *args, **kwargs): executable='core', name='core', output='screen', - parameters=[{'launch_arg': mode}] + parameters=[{'launch_mode': mode}] ) ) # elif mode == 'bio': @@ -77,7 +77,7 @@ def launch_setup(context, *args, **kwargs): # executable='bio', # name='bio', # output='screen', - # parameters=[{'launch_arg': mode}] + # parameters=[{'launch_mode': mode}] # ) # ) else: diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py index 45d60e2..8d70e06 100644 --- a/src/arm_pkg/arm_pkg/arm_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -17,76 +17,76 @@ thread = None class SerialRelay(Node): def __init__(self): - # Initialize node with name + # Initialize node super().__init__("arm_node") + # Get launch mode parameter + self.declare_parameter('launch_mode', 'arm') + launch_mode = self.get_parameter('launch_mode').value + self.get_logger().info(f"arm launch_mode is: {launch_mode}") + # Create publishers - - #Depricated, temporary for reference - # self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10) - # self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10) - # self.faerie_publisher = self.create_publisher(FaerieTelemetry, '/astra/arm/bio/feedback', 10) - self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) - - # Create subscribers - - #Depricated, temporary for reference - #self.control_subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10) - self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) - #self.command_subscriber = self.create_subscription(String, '/astra/arm/command', self.send_command, 10) - #self.faerie_subscriber = self.create_subscription(String, "/astra/arm/bio/control", self.send_faerie_controls, 10) + # Topics used in anchor mode + if launch_mode == 'anchor': + self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10) + self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) - # Loop through all serial devices on the computer to check for the MCU - self.port = None - ports = SerialRelay.list_serial_ports() - for i in range(2): - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - print(f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until("\n") - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.port = port - print(f"Found MCU at {self.port}!") - break - except: - pass - if self.port is not None: - break - - if self.port is None: - print("Unable to find MCU... please make sure it is connected.") - time.sleep(1) - sys.exit(1) - - self.ser = serial.Serial(self.port, 115200) - atexit.register(self.cleanup) + # Search for ports IF in 'arm' (standalone) and not 'anchor' mode + if launch_mode == 'arm': + # Loop through all serial devices on the computer to check for the MCU + self.port = None + ports = SerialRelay.list_serial_ports() + for i in range(4): + for port in ports: + try: + # connect and send a ping command + ser = serial.Serial(port, 115200, timeout=1) + #print(f"Checking port {port}...") + ser.write(b"ping\n") + response = ser.read_until("\n") + + # if pong is in response, then we are talking with the MCU + if b"pong" in response: + self.port = port + self.get_logger.info(f"Found MCU at {self.port}!") + break + except: + pass + if self.port is not None: + break + + if self.port is None: + self.get_logger.info("Unable to find MCU... please make sure it is connected.") + time.sleep(1) + sys.exit(1) + + self.ser = serial.Serial(self.port, 115200) + atexit.register(self.cleanup) def run(self): global thread thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread.start() - try: - while rclpy.ok(): - if self.ser.in_waiting: - self.read_mcu() - else: - time.sleep(0.1) - except KeyboardInterrupt: - pass - finally: - self.cleanup() + #if in arm mode, will need to read from the MCU + if self.launch_mode == 'arm': + try: + while rclpy.ok(): + if self.ser.in_waiting: + self.read_mcu() + else: + time.sleep(0.1) + except KeyboardInterrupt: + pass + finally: + self.cleanup() #Currently will just spit out all values over the /arm/feedback/debug topic as strings @@ -94,17 +94,17 @@ class SerialRelay(Node): try: output = str(self.ser.readline(), "utf8") if output: - print(f"[MCU] {output}", end="") + self.get_logger.info(f"[MCU] {output}", end="") msg = String() msg.data = output self.debug_pub.publish(msg) except serial.SerialException: - print("SerialException caught... closing serial port.") + self.get_logger.info("SerialException caught... closing serial port.") if self.ser.is_open: self.ser.close() pass except TypeError as e: - print(f"TypeError: {e}") + self.get_logger.info(f"TypeError: {e}") print("Closing serial port.") if self.ser.is_open: self.ser.close() @@ -126,8 +126,9 @@ class SerialRelay(Node): axis3 = msg.axis3 #Send controls for arm command = "can_relay_tovic,arm,40," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") + + self.send_cmd(command) + #print(f"[Wrote] {command}", end="") #Not yet finished, needs embedded implementation for new commands # ef_roll = msg.effector_roll @@ -141,110 +142,17 @@ class SerialRelay(Node): return + def send_cmd(self, cmd): + if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay + self.anchor_pub.publish(cmd) + elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly + self.ser.write(bytes(cmd, "utf8")) + #print(f"[Arm Wrote] {cmd}", end="") - # Depricated functions, kept temporarily for reference + def anchor_feedback(self, msg): + self.get_logger.info(f"[Anchor] {msg.data}", end="") + #self.send_cmd(msg.data) - # def send_controls(self, msg): - # command = "" - # ef_cmd = "" - # if(msg.b): - # command = "arm,stop\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - # return - - # if(msg.a): - # command = "arm,endEffect,act,0"#retract actuator out - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - # elif(msg.x): - # command = "arm,endEffect,act,1"#extend actuator in - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - - # if(msg.plus): - # command = "arm,endEffect,laser,1\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - # elif(msg.minus): - # command = "arm,endEffect,laser,0\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - - # if(msg.rb): - # ef_cmd = "arm,endEffect,ctrl," - # if(msg.lt >= 0.5): - # ef_cmd += "-1," - # elif(msg.rt >= 0.5): - # ef_cmd += "1," - # else: - # ef_cmd += "0," - - # if(msg.rs_x < 0): - # ef_cmd += "-1," - # elif(msg.rs_x > 0): - # ef_cmd += "1," - # else: - # ef_cmd += "0," - # if(msg.ls_x < 0): - # ef_cmd += "-1" - # elif(msg.ls_x > 0): - # ef_cmd += "1" - # else: - # ef_cmd += "0" - - # command = ef_cmd + "\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - # return - # else: - # ef_cmd = "arm,endEffect,ctrl," - # if(msg.lt >= 0.5): - # ef_cmd += "-1,0,0" - # elif(msg.rt >= 0.5): - # ef_cmd += "1,0,0" - # else: - # ef_cmd += "0,0,0" - # command = ef_cmd + "\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - - # if(msg.lb): - # #self.ser.write(bytes("arm,setMode,manual\n", "utf8")) - # command = "arm,man,0.25," - # else: - # #self.ser.write(bytes("arm,setMode,manual\n", "utf8")) - # command = "arm,man,0.15," - - # if(msg.d_left): - # command += "-1," - # elif(msg.d_right): - # command += "1," - # else: - # command += "0," - # if(msg.ls_x < -0.4): - # command += "1," - # elif(msg.ls_x > 0.4): - # command += "-1," - # else: - # command += "0," - # if(msg.ls_y < -0.4): - # command += "1," - # elif(msg.ls_y > 0.4): - # command += "-1," - # else: - # command += "0," - # if(msg.rs_y < -0.4): - # command += "1" - # elif(msg.rs_y > 0.4): - # command += "-1" - # else: - # command += "0" - - # command += "\n" - # self.ser.write(bytes(command, "utf8")) - # print(f"[Wrote] {command}", end="") - # return @staticmethod def list_serial_ports(): diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py index 34e0ba4..b93e764 100644 --- a/src/core_pkg/core_pkg/core_node.py +++ b/src/core_pkg/core_pkg/core_node.py @@ -23,41 +23,50 @@ class SerialRelay(Node): # Initalize node with name super().__init__("core_node")#previously 'serial_publisher' - # Create publishers for feedback and telemetry + # Get launch mode parameter + self.declare_parameter('launch_mode', 'core') + launch_mode = self.get_parameter('launch_mode').value + self.get_logger().info(f"core launch_mode is: {launch_mode}") + + # Create publishers self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) - - # Create a subscriber to listen to any commands sent for the MCU + # Create a subscriber self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) - # Create a service server for pinging the rover self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) - # Loop through all serial devices on the computer to check for the MCU - self.port = None - ports = SerialRelay.list_serial_ports() - for i in range(2): - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, 115200, timeout=1) - print(f"Checking port {port}...") - ser.write(b"ping\n") - response = ser.read_until("\n") + if launch_mode == 'anchor': + self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) + self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) - # if pong is in response, then we are talking with the MCU - if b"pong" in response: - self.port = port - print(f"Found MCU at {self.port}!") - break - except: - pass - if self.port is not None: - break + + if launch_mode == 'core': + # Loop through all serial devices on the computer to check for the MCU + self.port = None + ports = SerialRelay.list_serial_ports() + for i in range(2): + for port in ports: + try: + # connect and send a ping command + ser = serial.Serial(port, 115200, timeout=1) + #(f"Checking port {port}...") + ser.write(b"ping\n") + response = ser.read_until("\n") + + # if pong is in response, then we are talking with the MCU + if b"pong" in response: + self.port = port + self.get_logger.info(f"Found MCU at {self.port}!") + break + except: + pass + if self.port is not None: + break if self.port is None: - print("Unable to find MCU...") + self.get_logger.info("Unable to find MCU...") time.sleep(1) sys.exit(1) @@ -71,11 +80,12 @@ class SerialRelay(Node): thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread.start() - try: - while rclpy.ok(): - self.read_MCU() # Check the MCU for updates - except KeyboardInterrupt: - sys.exit(0) + if self.launch_mode == 'core': + try: + while rclpy.ok(): + self.read_MCU() # Check the MCU for updates + except KeyboardInterrupt: + sys.exit(0) def read_MCU(self): try: @@ -157,10 +167,18 @@ class SerialRelay(Node): command = "can_relay_tovic,core,19," + self.scale_duty(left_stick_neg, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n' #print(f"[Sys] {command}", end="") - self.ser.write(bytes(command, "utf8"))# Send command to MCU - self.get_logger().debug(f"wrote: {command}") + #self.ser.write(bytes(command, "utf8"))# Send command to MCU + #self.get_logger().debug(f"wrote: {command}") + + self.send_cmd(command) + #print(f"[Sys] Relaying: {command}") + def send_cmd(self, cmd): + if self.launch_mode == 'anchor': + self.anchor_pub.publish(cmd) + elif self.launch_mode == 'core': + self.ser.write(bytes(cmd, "utf8")) def ping_callback(self, request, response): return response