refactor: (arm) cleanup old standalone serial code

Removed literally 100 lines
This commit is contained in:
David
2026-02-04 04:26:50 -06:00
parent 51d0e747ad
commit 13419e97c9

View File

@@ -25,7 +25,6 @@ import math
# liveliness_lease_duration=5000 # liveliness_lease_duration=5000
# ) # )
serial_pub = None
thread = None thread = None
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message
@@ -44,20 +43,16 @@ viccan_digit_msg_len_dict = {
} }
class SerialRelay(Node): class ArmNode(Node):
def __init__(self): def __init__(self):
super().__init__("arm_node") super().__init__("arm_node")
# Get launch mode -- anchor vs arm self.get_logger().info(f"arm launch_mode is: anchor") # Hey I like the output
self.declare_parameter("launch_mode", "arm")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
################################################## ##################################################
# Topics # Topics
# Anchor topics # Anchor topics
if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription( self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20 VicCAN, "/anchor/from_vic/arm", self.relay_fromvic, 20
) )
@@ -110,7 +105,6 @@ class SerialRelay(Node):
# Old # Old
# Create publishers # Create publishers
self.debug_pub = self.create_publisher(String, "/arm/feedback/debug", 10)
self.socket_pub = self.create_publisher( self.socket_pub = self.create_publisher(
SocketFeedback, "/arm/feedback/socket", 10 SocketFeedback, "/arm/feedback/socket", 10
) )
@@ -124,88 +118,16 @@ class SerialRelay(Node):
ArmManual, "/arm/control/manual", self.send_manual, 10 ArmManual, "/arm/control/manual", self.send_manual, 10
) )
##################################################
# Find microcontroller -- non-anchor only
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == "arm":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for _ in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n") # type: ignore
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == "arm": pass
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def joint_command_callback(self, msg: JointState): def joint_command_callback(self, msg: JointState):
if len(msg.position) < 7 and len(msg.velocity) < 7: if len(msg.position) < 7 and len(msg.velocity) < 7:
@@ -256,15 +178,8 @@ class SerialRelay(Node):
return return
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if ( output = String(data=msg)
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == "arm": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
@@ -473,33 +388,13 @@ class SerialRelay(Node):
self.arm_feedback.axis0_voltage = voltage self.arm_feedback.axis0_voltage = voltage
self.arm_feedback.axis0_current = current self.arm_feedback.axis0_current = current
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub arm_node = ArmNode()
serial_pub = SerialRelay() arm_node.run()
serial_pub.run() rclpy.try_shutdown()
if __name__ == "__main__": if __name__ == "__main__":