Merge pull request #14 from SHC-ASTRA/main

merge main into ptz-zoom
This commit is contained in:
Tristan McGinnis
2025-05-29 18:30:07 -06:00
committed by GitHub
5 changed files with 65 additions and 89 deletions

View File

@@ -165,7 +165,7 @@ class SerialRelay(Node):
return
def send_cmd(self, msg):
def send_cmd(self, msg: str):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
@@ -174,7 +174,7 @@ class SerialRelay(Node):
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg):
def anchor_feedback(self, msg: String):
output = msg.data
if output.startswith("can_relay_fromvic,arm,55"):
#pass
@@ -205,10 +205,10 @@ class SerialRelay(Node):
self.socket_pub.publish(self.arm_feedback)
self.digit_pub.publish(self.digit_feedback)
def updateAngleFeedback(self, msg: String):
def updateAngleFeedback(self, msg: str):
# Angle feedbacks,
#split the msg.data by commas
parts = msg.data.split(",")
parts = msg.split(",")
if len(parts) >= 7:
# Extract the angles from the string
@@ -241,9 +241,9 @@ class SerialRelay(Node):
self.get_logger().info("Invalid angle feedback input format")
def updateBusVoltage(self, msg: String):
def updateBusVoltage(self, msg: str):
# Bus Voltage feedbacks
parts = msg.data.split(",")
parts = msg.split(",")
if len(parts) >= 7:
# Extract the voltage from the string
voltages_in = parts[3:7]

View File

@@ -144,10 +144,11 @@ class SerialRelay(Node):
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
self.send_cmd(command)
#self.send_cmd(command)
# Vibration Motor
command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
self.send_cmd(command)
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
#self.send_cmd(command)
# FAERIE Control Commands
@@ -156,16 +157,15 @@ class SerialRelay(Node):
# To be reviewed before use#
# Laser
command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
#self.send_cmd(command)
# Drill (SCABBARD)
command = f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
print(msg.drill)
self.send_cmd(command)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
#self.send_cmd(command)
# Bio linear actuator
command = "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
@@ -182,7 +182,7 @@ class SerialRelay(Node):
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
self.get_logger().info(f"[Bio Anchor] {msg.data}")
#self.get_logger().info(f"[Bio Anchor] {msg.data}")
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0

View File

@@ -20,113 +20,86 @@ os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
max_speed = 75 #Max speed as a duty cycle percentage (1-100)
max_speed = 90 #Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("core_headless")
self.create_timer(0.20, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
# Initialize pygame first
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
# Wait for a gamepad to be connected
self.gamepad = None
print("Waiting for gamepad connection...")
while pygame.joystick.get_count() == 0:
# Process any pygame events to keep it responsive
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
time.sleep(1.0) # Check every second
print("No gamepad found. Waiting...")
# Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}')
#
#
# Now initialize the ROS2 node
super().__init__("core_headless")
self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
#Check the pico for updates
self.send_controls()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
time.sleep(0.1) # Small delay to avoid CPU hogging
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = CoreControl()
input.max_speed = max_speed
input.right_stick = round(self.gamepad.get_axis(4),2)#right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = round(self.gamepad.get_axis(1),2)#lext y-axis
if pygame.joystick.get_count() != 0:
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' #stop the rover if there is no controller connected
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
else:
sys.exit(0)
# Check if controller is still connected
if pygame.joystick.get_count() == 0:
print("Gamepad disconnected. Exiting...")
# Send one last zero control message
input = CoreControl()
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.get_logger().info(f"[Ctrl] Stopping. No Gamepad Connected. ")
self.publisher.publish(input)
self.get_logger().info("Final stop command sent. Shutting down.")
# Clean up
pygame.quit()
sys.exit(0)
input = CoreControl()
input.max_speed = max_speed
input.right_stick = -1 * round(self.gamepad.get_axis(4), 2) # right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}'
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__':
main()

View File

@@ -202,6 +202,9 @@ class SerialRelay(Node):
self.core_feedback.gps_long = float(parts[3])
elif output.startswith("can_relay_fromvic,core,50"):#GNSS Satellite Count
self.core_feedback.gps_sats = round(float(parts[3]))
#if parts length is at least 5 then we should have altitude, this is a temporary check until fully implemented
if len(parts) >= 5:
self.core_feedback.gps_alt = round(float(parts[4]), 2)
elif output.startswith("can_relay_fromvic,core,51"):#Gyro x,y,z
self.core_feedback.bno_gyro.x = float(parts[3])
self.core_feedback.bno_gyro.y = float(parts[4])