reorganize core headless node for usage as a service

This commit is contained in:
Tristan McGinnis
2025-05-28 22:57:38 -06:00
parent b06194053c
commit 80d59cc275

View File

@@ -24,109 +24,82 @@ max_speed = 90 #Max speed as a duty cycle percentage (1-100)
class Headless(Node): class Headless(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initialize pygame first
super().__init__("core_headless")
self.create_timer(0.20, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
pygame.init() pygame.init()
# Initialize the gamepad module
pygame.joystick.init() pygame.joystick.init()
# Check if any gamepad is connected # Wait for a gamepad to be connected
if pygame.joystick.get_count() == 0: self.gamepad = None
print("No gamepad found.") print("Waiting for gamepad connection...")
pygame.quit() while pygame.joystick.get_count() == 0:
exit() # Process any pygame events to keep it responsive
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.QUIT: if event.type == pygame.QUIT:
pygame.quit() pygame.quit()
exit() sys.exit(0)
time.sleep(1.0) # Check every second
# Initialize the first gamepad, print name to terminal print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0) self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}') print(f'Gamepad Found: {self.gamepad.get_name()}')
#
# # Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start() thread.start()
try: try:
while rclpy.ok(): while rclpy.ok():
#Check the pico for updates
self.send_controls() self.send_controls()
time.sleep(0.1) # Small delay to avoid CPU hogging
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(0) sys.exit(0)
def send_controls(self): def send_controls(self):
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.QUIT: if event.type == pygame.QUIT:
pygame.quit() pygame.quit()
exit() sys.exit(0)
input = CoreControl()
# Check if controller is still connected
input.max_speed = max_speed if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
input.right_stick = -1 * round(self.gamepad.get_axis(4),2)#right y-axis # Send one last zero control message
if self.gamepad.get_axis(5) > 0: input = CoreControl()
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1),2)#lext y-axis
if pygame.joystick.get_count() != 0:
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' #stop the rover if there is no controller connected
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
else:
input.left_stick = 0 input.left_stick = 0
input.right_stick = 0 input.right_stick = 0
input.max_speed = 0 input.max_speed = 0
self.get_logger().info(f"[Ctrl] Stopping. No Gamepad Connected. ")
self.publisher.publish(input) self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}'
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
rclpy.shutdown() rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()