From 328260844bc10ffb28186c238c2563850fd7c0b9 Mon Sep 17 00:00:00 2001 From: Tristan McGinnis Date: Tue, 18 Feb 2025 12:25:23 -0600 Subject: [PATCH] move headless into arm_pkg also did some renaming of nodes/files --- src/arm_pkg/arm_pkg/arm_headless.py | 258 +++++++++++++++++ .../{arm_relay_node.py => arm_node.py} | 2 +- src/arm_pkg/arm_pkg/arm_relay_node.py.bkp | 259 ------------------ src/arm_pkg/setup.py | 6 +- src/arm_pkg/test/test_copyright.py | 25 -- src/arm_pkg/test/test_flake8.py | 25 -- src/arm_pkg/test/test_pep257.py | 23 -- 7 files changed, 262 insertions(+), 336 deletions(-) create mode 100755 src/arm_pkg/arm_pkg/arm_headless.py rename src/arm_pkg/arm_pkg/{arm_relay_node.py => arm_node.py} (99%) delete mode 100755 src/arm_pkg/arm_pkg/arm_relay_node.py.bkp delete mode 100644 src/arm_pkg/test/test_copyright.py delete mode 100644 src/arm_pkg/test/test_flake8.py delete mode 100644 src/arm_pkg/test/test_pep257.py diff --git a/src/arm_pkg/arm_pkg/arm_headless.py b/src/arm_pkg/arm_pkg/arm_headless.py new file mode 100755 index 0000000..761cbe3 --- /dev/null +++ b/src/arm_pkg/arm_pkg/arm_headless.py @@ -0,0 +1,258 @@ +import rclpy +from rclpy.node import Node + +import pygame + +import time + +import serial +import sys +import threading +import glob +import os + +from std_msgs.msg import String +from ros2_interfaces_pkg.msg import ControllerState +from ros2_interfaces_pkg.msg import ArmManual +from ros2_interfaces_pkg.msg import ArmIK + + +os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() +os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display + +class Headless(Node): + def __init__(self): + # Initalize node with name + super().__init__("arm_headless") + + # Depricated, kept temporarily for reference + # self.create_timer(0.20, self.send_controls)#read and send controls + + self.create_timer(0.20, self.send_manual) + + + # Create a publisher to publish any output the pico sends + + # Depricated, kept temporarily for reference + #self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) + self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10) + + # Create a subscriber to listen to any commands sent for the pico + + # Depricated, kept temporarily for reference + #self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) + + self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) + + + #self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change + + + # Initialize pygame + pygame.init() + + # Initialize the gamepad module + pygame.joystick.init() + + # Check if any gamepad is connected + if pygame.joystick.get_count() == 0: + print("No gamepad found.") + pygame.quit() + exit() + for event in pygame.event.get(): + if event.type == pygame.QUIT: + pygame.quit() + exit() + + # Initialize the first gamepad, print name to terminal + self.gamepad = pygame.joystick.Joystick(0) + self.gamepad.init() + print(f'Gamepad Found: {self.gamepad.get_name()}') + # + # + + + def run(self): + # This thread makes all the update processes run in the background + thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) + thread.start() + + + try: + while rclpy.ok(): + #Check the pico for updates + + self.read_feedback() + if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected + print(f"Gamepad disconnected: {self.gamepad.get_name()}") + + while pygame.joystick.get_count() == 0: + #self.send_controls() #depricated, kept for reference temporarily + self.send_manual() + self.read_feedback() + self.gamepad = pygame.joystick.Joystick(0) + self.gamepad.init() #re-initialized gamepad + print(f"Gamepad reconnected: {self.gamepad.get_name()}") + + + except KeyboardInterrupt: + sys.exit(0) + + + def send_manual(self): + for event in pygame.event.get(): + if event.type == pygame.QUIT: + pygame.quit() + exit() + input = ArmManual() + + dpad_input = self.gamepad.get_hat(0) + input.axis0 = 0 + if dpad_input[0] == 1: + input.axis0 = 1 + elif dpad_input[0] == -1: + input.axis0 = -1 + + input.axis1 = round(self.gamepad.get_axis(0))#left x-axis + input.axis2 = round(self.gamepad.get_axis(1))#left y-axis + input.axis3 = round(self.gamepad.get_axis(4))#right y-axis + + #Temporary, not controlling digit. Awaiting embedded implementation + input.effector_yaw = 0 + input.effector_roll = 0 + input.gripper = 0 + input.linear_actuator = 0 + input.laser = 0 + + + if pygame.joystick.get_count() != 0: + + self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n") + self.manual_pub.publish(input) + else: + pass + + pass + + # Depricated, kept temporarily for reference + # def send_controls(self): + + # for event in pygame.event.get(): + # if event.type == pygame.QUIT: + # pygame.quit() + # exit() + # input = ControllerState() + + # input.lt = self.gamepad.get_axis(2)#left trigger + # input.rt = self.gamepad.get_axis(5)#right trigger + + # #input.lb = self.gamepad.get_button(9)#Value must be converted to bool + # if(self.gamepad.get_button(4)):#left bumper + # input.lb = True + # else: + # input.lb = False + + # #input.rb = self.gamepad.get_button(10)#Value must be converted to bool + # if(self.gamepad.get_button(5)):#right bumper + # input.rb = True + # else: + # input.rb = False + + # #input.plus = self.gamepad.get_button(6)#plus button + # if(self.gamepad.get_button(7)):#plus button + # input.plus = True + # else: + # input.plus = False + + # #input.minus = self.gamepad.get_button(4)#minus button + # if(self.gamepad.get_button(6)):#minus button + # input.minus = True + # else: + # input.minus = False + + # input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis + # input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis + # input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis + # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis + + # #input.a = self.gamepad.get_button(1)#A button + # if(self.gamepad.get_button(0)):#A button + # input.a = True + # else: + # input.a = False + # #input.b = self.gamepad.get_button(0)#B button + # if(self.gamepad.get_button(1)):#B button + # input.b = True + # else: + # input.b = False + # #input.x = self.gamepad.get_button(3)#X button + # if(self.gamepad.get_button(2)):#X button + # input.x = True + # else: + # input.x = False + # #input.y = self.gamepad.get_button(2)#Y button + # if(self.gamepad.get_button(3)):#Y button + # input.y = True + # else: + # input.y = False + + + # dpad_input = self.gamepad.get_hat(0)#D-pad input + + # #not using up/down on DPad + # input.d_up = False + # input.d_down = False + + + # if(dpad_input[0] == 1):#D-pad right + # input.d_right = True + # else: + # input.d_right = False + # if(dpad_input[0] == -1):#D-pad left + # input.d_left = True + # else: + # input.d_left = False + + + # if pygame.joystick.get_count() != 0: + + # self.get_logger().info(f"[Ctrl] Updated Controller State\n") + + # self.publisher.publish(input) + # else: + # pass + + + + + + def read_feedback(self, msg): + + # Create a string message object + #msg = String() + + # Set message data + #msg.data = output + + # Publish data + #self.publisher.publish(msg.data) + + print(f"[MCU] {msg.data}", end="") + #print(f"[Pico] Publishing: {msg}") + + + +def main(args=None): + rclpy.init(args=args) + + node = Headless() + + rclpy.spin(node) + rclpy.shutdown() + + #tb_bs = BaseStation() + #node.run() + + +if __name__ == '__main__': + main() diff --git a/src/arm_pkg/arm_pkg/arm_relay_node.py b/src/arm_pkg/arm_pkg/arm_node.py similarity index 99% rename from src/arm_pkg/arm_pkg/arm_relay_node.py rename to src/arm_pkg/arm_pkg/arm_node.py index 705ada6..45d60e2 100644 --- a/src/arm_pkg/arm_pkg/arm_relay_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -18,7 +18,7 @@ thread = None class SerialRelay(Node): def __init__(self): # Initialize node with name - super().__init__("arm_relay") + super().__init__("arm_node") # Create publishers diff --git a/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp b/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp deleted file mode 100755 index 20d59e5..0000000 --- a/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp +++ /dev/null @@ -1,259 +0,0 @@ -import rclpy -from rclpy.node import Node - -import serial -import sys -import threading -import glob -import time - - -from std_msgs.msg import String -from interfaces_pkg.msg import ControllerState -from interfaces_pkg.msg import ArmState - - - -serial_pub = None -thread = None - - -class SerialRelay(Node): - def __init__(self): - # Initalize node with name - super().__init__("arm_relay") - - # Create a publisher to publish any output the mcu sends - self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)#Random String feedback data - self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)#Standard state feedback data - - # Create a subscriber to listen to any commands sent for the mcu - self.subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10) - - # Loop through all serial devices on the computer to check for the mcu - self.port = None - ports = SerialRelay.list_serial_ports() - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, timeout=1) - ser.write(b"arm,ping\n") - response = ser.read_until("\n") - - # if pong is in response, then we are talking with the mcu - if b"pong" in response: - self.port = port - print(f"Found MCU at {self.port}!") - break - except: - pass - - if self.port is None: - print("Unable to find MCU... please make sure it is connected.") - #sys.exit(1) TEMPORARY TEST, UNCOMMENT THIS LINE IN THE FUTURE - - self.ser = serial.Serial(self.port, 115200) - - #self.mutex = threading.Lock() - - def run(self): - # This thread makes all the update processes run in the background - global thread - thread = threading.Thread(target=rclpy.spin, args={self}, daemon = True) - thread.start() - - try: - while rclpy.ok(): - # Check the mcu for updates - #rclpy.spin_once(self, timeout_sec=0.1) - #self.mutex.acquire() - if self.ser.in_waiting: - #self.mutex.release() - self.read_mcu() - else: - time.sleep(0.1) - - except KeyboardInterrupt: - #self.mutex.release() - serial_pub.ser.close() - sys.exit(0) - - - def read_mcu(self): - try: - #self.mutex.acquire() - output = str(self.ser.readline(), "utf8") - if output: - print(f"[MCU] {output}", end="") - # Create a string message object - msg = String() - - # Set message data - msg.data = output - - # Publish data - self.output_publisher.publish(msg) - #print(f"[MCU] Publishing: {msg}") - - except serial.SerialException: - #self.mutex.release() - pass - #finally: - #self.mutex.release() - - def send_controls(self, msg): - command = "" - ef_cmd = "" #end effector command to be apended - #self.mutex.acquire() - if(msg.b):#If B button: send ESTOP command - command = "arm,stop\n" - self.ser.write(bytes(command, "utf8"))#Send command to MCU - print(f"[Wrote] {command}", end="")#Echo command to console - - #self.mutex.release()#Release mutex lock - return - - if(msg.plus):#Turn EF laser on - command = "arm,endEffect,laser,1\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - elif(msg.minus):#Turn EF laser off - command = "arm,endEffect,laser,0\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - if(msg.rb):#If RB button: End Effector control mode - ef_cmd = "arm,endEffect,ctrl," - - if(msg.lt >= 0.5):#If LT button: Open end effector - ef_cmd += "-1," - elif(msg.rt >= 0.5):#If RT button: Close end effector - ef_cmd += "1," - else: - ef_cmd += "0," - - if(msg.rs_x < 0): #Check left/stop/right for tilt movement - ef_cmd += "-1," - elif(msg.rs_x > 0): - ef_cmd += "1," - else: - ef_cmd += "0," - pass - - if(msg.ls_x < 0): #Check left/stop/right for revolve movement - ef_cmd += "-1" - elif(msg.ls_x > 0): - ef_cmd += "1" - else: - ef_cmd += "0" - - command = ef_cmd + "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - #self.mutex.release() - return - else: - ef_cmd = "arm,endEffect,ctrl," - if(msg.lt >= 0.5): - ef_cmd += "-1,0,0" - elif(msg.rt >= 0.5): - ef_cmd += "1,0,0" - else: - ef_cmd += "0,0,0" - command = ef_cmd + "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - if(msg.lb):#If LB button: Manual control mode - #First, ensure control mode is set to manual on the MCU - self.ser.write(bytes("arm,setMode,manual\n", "utf8")) - #print(f"[Wrote] arm,setMode,manual", end="") - - command = "arm,man,0.15,"#Set manual control duty cycle statically to 15% for now - if(msg.d_left):#If D-Pad left set axis_1 to -1 - command += "-1," - elif(msg.d_right):#If D-Pad right set axis_0 to 1 - command += "1," - else: - command += "0," - - #Axis_1 - if(msg.ls_x < -0.5):#If LS left at least 50% set axis_1 to -1 - command += "-1," - elif(msg.ls_x > 0.5):#If LS right at least 50% set axis_1 to 1 - command += "1," - else: - command += "0," - - #Axis_2 - if(msg.ls_y < -0.5):#If LS up at least 50% set axis_2 to 1 - command += "1," - elif(msg.ls_y > 0.5):#If LS down at least 50% set axis_2 to -1 - command += "-1," - else: - command += "0," - - #Axis_3 - if(msg.rs_y < -0.5):#If RS up at least 50% set axis_3 to 1 - command += "1" - elif(msg.rs_y > 0.5):#If RS down at least 50% set axis_3 to -1 - command += "-1" - else: - command += "0" - - command += "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - #self.mutex.release() - return - else:#Else normal (IK) control mode - #First, ensure control mode is set to IK on the MCU - self.ser.write(bytes("arm,setMode,ik\n", "utf8")) - #print(f"[Wrote] arm,setMode,ik", end="") - - command = "arm,ik," - if(msg.d_left):#If D-Pad left set axis zero to -1 - command += "-1," - elif(msg.d_right):#If D-Pad right set axis zero to 1 - command += "1," - else: - command += "0," - - #start with input from controller - coord_x = -1*msg.ls_y #out/in - coord_y = -1*msg.rs_y #up/down - - #convert units for ik coordinate offsets - coord_x = round(coord_x * 20.4 * 2, 1) #20.4mm (1 in.) per 1 unit of input. Times 2 for two inches - coord_y = round(coord_y * 20.4 * 2, 1) # - - command += f"{coord_x},{coord_y}\n" - - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - #self.mutex.release() - return - - - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/tty[A-Za-z]*") - - -def myexcepthook(type, value, tb): - print("Uncaught exception:", type, value) - serial_pub.ser.close() - - -def main(args=None): - rclpy.init(args=args) - sys.excepthook = myexcepthook - - global serial_pub - serial_pub = SerialRelay() - serial_pub.run() - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/arm_pkg/setup.py b/src/arm_pkg/setup.py index a12a27f..8c8c3bd 100644 --- a/src/arm_pkg/setup.py +++ b/src/arm_pkg/setup.py @@ -16,11 +16,11 @@ setup( maintainer='tristan', maintainer_email='tristanmcginnis26@gmail.com', description='TODO: Package description', - license='TODO: License declaration', - tests_require=['pytest'], + license='All Rights Reserved', entry_points={ 'console_scripts': [ - 'arm = arm_pkg.arm_relay_node:main' + 'arm = arm_pkg.arm_node:main', + 'headless = arm_pkg.arm_headless:main' ], }, ) diff --git a/src/arm_pkg/test/test_copyright.py b/src/arm_pkg/test/test_copyright.py deleted file mode 100644 index 97a3919..0000000 --- a/src/arm_pkg/test/test_copyright.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -# Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' diff --git a/src/arm_pkg/test/test_flake8.py b/src/arm_pkg/test/test_flake8.py deleted file mode 100644 index 27ee107..0000000 --- a/src/arm_pkg/test/test_flake8.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_flake8.main import main_with_errors -import pytest - - -@pytest.mark.flake8 -@pytest.mark.linter -def test_flake8(): - rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) diff --git a/src/arm_pkg/test/test_pep257.py b/src/arm_pkg/test/test_pep257.py deleted file mode 100644 index b234a38..0000000 --- a/src/arm_pkg/test/test_pep257.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_pep257.main import main -import pytest - - -@pytest.mark.linter -@pytest.mark.pep257 -def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings'