From 328260844bc10ffb28186c238c2563850fd7c0b9 Mon Sep 17 00:00:00 2001 From: Tristan McGinnis Date: Tue, 18 Feb 2025 12:25:23 -0600 Subject: [PATCH 1/2] move headless into arm_pkg also did some renaming of nodes/files --- src/arm_pkg/arm_pkg/arm_headless.py | 258 +++++++++++++++++ .../{arm_relay_node.py => arm_node.py} | 2 +- src/arm_pkg/arm_pkg/arm_relay_node.py.bkp | 259 ------------------ src/arm_pkg/setup.py | 6 +- src/arm_pkg/test/test_copyright.py | 25 -- src/arm_pkg/test/test_flake8.py | 25 -- src/arm_pkg/test/test_pep257.py | 23 -- 7 files changed, 262 insertions(+), 336 deletions(-) create mode 100755 src/arm_pkg/arm_pkg/arm_headless.py rename src/arm_pkg/arm_pkg/{arm_relay_node.py => arm_node.py} (99%) delete mode 100755 src/arm_pkg/arm_pkg/arm_relay_node.py.bkp delete mode 100644 src/arm_pkg/test/test_copyright.py delete mode 100644 src/arm_pkg/test/test_flake8.py delete mode 100644 src/arm_pkg/test/test_pep257.py diff --git a/src/arm_pkg/arm_pkg/arm_headless.py b/src/arm_pkg/arm_pkg/arm_headless.py new file mode 100755 index 0000000..761cbe3 --- /dev/null +++ b/src/arm_pkg/arm_pkg/arm_headless.py @@ -0,0 +1,258 @@ +import rclpy +from rclpy.node import Node + +import pygame + +import time + +import serial +import sys +import threading +import glob +import os + +from std_msgs.msg import String +from ros2_interfaces_pkg.msg import ControllerState +from ros2_interfaces_pkg.msg import ArmManual +from ros2_interfaces_pkg.msg import ArmIK + + +os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() +os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display + +class Headless(Node): + def __init__(self): + # Initalize node with name + super().__init__("arm_headless") + + # Depricated, kept temporarily for reference + # self.create_timer(0.20, self.send_controls)#read and send controls + + self.create_timer(0.20, self.send_manual) + + + # Create a publisher to publish any output the pico sends + + # Depricated, kept temporarily for reference + #self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) + self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10) + + # Create a subscriber to listen to any commands sent for the pico + + # Depricated, kept temporarily for reference + #self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) + + self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) + + + #self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change + + + # Initialize pygame + pygame.init() + + # Initialize the gamepad module + pygame.joystick.init() + + # Check if any gamepad is connected + if pygame.joystick.get_count() == 0: + print("No gamepad found.") + pygame.quit() + exit() + for event in pygame.event.get(): + if event.type == pygame.QUIT: + pygame.quit() + exit() + + # Initialize the first gamepad, print name to terminal + self.gamepad = pygame.joystick.Joystick(0) + self.gamepad.init() + print(f'Gamepad Found: {self.gamepad.get_name()}') + # + # + + + def run(self): + # This thread makes all the update processes run in the background + thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) + thread.start() + + + try: + while rclpy.ok(): + #Check the pico for updates + + self.read_feedback() + if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected + print(f"Gamepad disconnected: {self.gamepad.get_name()}") + + while pygame.joystick.get_count() == 0: + #self.send_controls() #depricated, kept for reference temporarily + self.send_manual() + self.read_feedback() + self.gamepad = pygame.joystick.Joystick(0) + self.gamepad.init() #re-initialized gamepad + print(f"Gamepad reconnected: {self.gamepad.get_name()}") + + + except KeyboardInterrupt: + sys.exit(0) + + + def send_manual(self): + for event in pygame.event.get(): + if event.type == pygame.QUIT: + pygame.quit() + exit() + input = ArmManual() + + dpad_input = self.gamepad.get_hat(0) + input.axis0 = 0 + if dpad_input[0] == 1: + input.axis0 = 1 + elif dpad_input[0] == -1: + input.axis0 = -1 + + input.axis1 = round(self.gamepad.get_axis(0))#left x-axis + input.axis2 = round(self.gamepad.get_axis(1))#left y-axis + input.axis3 = round(self.gamepad.get_axis(4))#right y-axis + + #Temporary, not controlling digit. Awaiting embedded implementation + input.effector_yaw = 0 + input.effector_roll = 0 + input.gripper = 0 + input.linear_actuator = 0 + input.laser = 0 + + + if pygame.joystick.get_count() != 0: + + self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n") + self.manual_pub.publish(input) + else: + pass + + pass + + # Depricated, kept temporarily for reference + # def send_controls(self): + + # for event in pygame.event.get(): + # if event.type == pygame.QUIT: + # pygame.quit() + # exit() + # input = ControllerState() + + # input.lt = self.gamepad.get_axis(2)#left trigger + # input.rt = self.gamepad.get_axis(5)#right trigger + + # #input.lb = self.gamepad.get_button(9)#Value must be converted to bool + # if(self.gamepad.get_button(4)):#left bumper + # input.lb = True + # else: + # input.lb = False + + # #input.rb = self.gamepad.get_button(10)#Value must be converted to bool + # if(self.gamepad.get_button(5)):#right bumper + # input.rb = True + # else: + # input.rb = False + + # #input.plus = self.gamepad.get_button(6)#plus button + # if(self.gamepad.get_button(7)):#plus button + # input.plus = True + # else: + # input.plus = False + + # #input.minus = self.gamepad.get_button(4)#minus button + # if(self.gamepad.get_button(6)):#minus button + # input.minus = True + # else: + # input.minus = False + + # input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis + # input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis + # input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis + # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis + + # #input.a = self.gamepad.get_button(1)#A button + # if(self.gamepad.get_button(0)):#A button + # input.a = True + # else: + # input.a = False + # #input.b = self.gamepad.get_button(0)#B button + # if(self.gamepad.get_button(1)):#B button + # input.b = True + # else: + # input.b = False + # #input.x = self.gamepad.get_button(3)#X button + # if(self.gamepad.get_button(2)):#X button + # input.x = True + # else: + # input.x = False + # #input.y = self.gamepad.get_button(2)#Y button + # if(self.gamepad.get_button(3)):#Y button + # input.y = True + # else: + # input.y = False + + + # dpad_input = self.gamepad.get_hat(0)#D-pad input + + # #not using up/down on DPad + # input.d_up = False + # input.d_down = False + + + # if(dpad_input[0] == 1):#D-pad right + # input.d_right = True + # else: + # input.d_right = False + # if(dpad_input[0] == -1):#D-pad left + # input.d_left = True + # else: + # input.d_left = False + + + # if pygame.joystick.get_count() != 0: + + # self.get_logger().info(f"[Ctrl] Updated Controller State\n") + + # self.publisher.publish(input) + # else: + # pass + + + + + + def read_feedback(self, msg): + + # Create a string message object + #msg = String() + + # Set message data + #msg.data = output + + # Publish data + #self.publisher.publish(msg.data) + + print(f"[MCU] {msg.data}", end="") + #print(f"[Pico] Publishing: {msg}") + + + +def main(args=None): + rclpy.init(args=args) + + node = Headless() + + rclpy.spin(node) + rclpy.shutdown() + + #tb_bs = BaseStation() + #node.run() + + +if __name__ == '__main__': + main() diff --git a/src/arm_pkg/arm_pkg/arm_relay_node.py b/src/arm_pkg/arm_pkg/arm_node.py similarity index 99% rename from src/arm_pkg/arm_pkg/arm_relay_node.py rename to src/arm_pkg/arm_pkg/arm_node.py index 705ada6..45d60e2 100644 --- a/src/arm_pkg/arm_pkg/arm_relay_node.py +++ b/src/arm_pkg/arm_pkg/arm_node.py @@ -18,7 +18,7 @@ thread = None class SerialRelay(Node): def __init__(self): # Initialize node with name - super().__init__("arm_relay") + super().__init__("arm_node") # Create publishers diff --git a/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp b/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp deleted file mode 100755 index 20d59e5..0000000 --- a/src/arm_pkg/arm_pkg/arm_relay_node.py.bkp +++ /dev/null @@ -1,259 +0,0 @@ -import rclpy -from rclpy.node import Node - -import serial -import sys -import threading -import glob -import time - - -from std_msgs.msg import String -from interfaces_pkg.msg import ControllerState -from interfaces_pkg.msg import ArmState - - - -serial_pub = None -thread = None - - -class SerialRelay(Node): - def __init__(self): - # Initalize node with name - super().__init__("arm_relay") - - # Create a publisher to publish any output the mcu sends - self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)#Random String feedback data - self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)#Standard state feedback data - - # Create a subscriber to listen to any commands sent for the mcu - self.subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10) - - # Loop through all serial devices on the computer to check for the mcu - self.port = None - ports = SerialRelay.list_serial_ports() - for port in ports: - try: - # connect and send a ping command - ser = serial.Serial(port, timeout=1) - ser.write(b"arm,ping\n") - response = ser.read_until("\n") - - # if pong is in response, then we are talking with the mcu - if b"pong" in response: - self.port = port - print(f"Found MCU at {self.port}!") - break - except: - pass - - if self.port is None: - print("Unable to find MCU... please make sure it is connected.") - #sys.exit(1) TEMPORARY TEST, UNCOMMENT THIS LINE IN THE FUTURE - - self.ser = serial.Serial(self.port, 115200) - - #self.mutex = threading.Lock() - - def run(self): - # This thread makes all the update processes run in the background - global thread - thread = threading.Thread(target=rclpy.spin, args={self}, daemon = True) - thread.start() - - try: - while rclpy.ok(): - # Check the mcu for updates - #rclpy.spin_once(self, timeout_sec=0.1) - #self.mutex.acquire() - if self.ser.in_waiting: - #self.mutex.release() - self.read_mcu() - else: - time.sleep(0.1) - - except KeyboardInterrupt: - #self.mutex.release() - serial_pub.ser.close() - sys.exit(0) - - - def read_mcu(self): - try: - #self.mutex.acquire() - output = str(self.ser.readline(), "utf8") - if output: - print(f"[MCU] {output}", end="") - # Create a string message object - msg = String() - - # Set message data - msg.data = output - - # Publish data - self.output_publisher.publish(msg) - #print(f"[MCU] Publishing: {msg}") - - except serial.SerialException: - #self.mutex.release() - pass - #finally: - #self.mutex.release() - - def send_controls(self, msg): - command = "" - ef_cmd = "" #end effector command to be apended - #self.mutex.acquire() - if(msg.b):#If B button: send ESTOP command - command = "arm,stop\n" - self.ser.write(bytes(command, "utf8"))#Send command to MCU - print(f"[Wrote] {command}", end="")#Echo command to console - - #self.mutex.release()#Release mutex lock - return - - if(msg.plus):#Turn EF laser on - command = "arm,endEffect,laser,1\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - elif(msg.minus):#Turn EF laser off - command = "arm,endEffect,laser,0\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - if(msg.rb):#If RB button: End Effector control mode - ef_cmd = "arm,endEffect,ctrl," - - if(msg.lt >= 0.5):#If LT button: Open end effector - ef_cmd += "-1," - elif(msg.rt >= 0.5):#If RT button: Close end effector - ef_cmd += "1," - else: - ef_cmd += "0," - - if(msg.rs_x < 0): #Check left/stop/right for tilt movement - ef_cmd += "-1," - elif(msg.rs_x > 0): - ef_cmd += "1," - else: - ef_cmd += "0," - pass - - if(msg.ls_x < 0): #Check left/stop/right for revolve movement - ef_cmd += "-1" - elif(msg.ls_x > 0): - ef_cmd += "1" - else: - ef_cmd += "0" - - command = ef_cmd + "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - #self.mutex.release() - return - else: - ef_cmd = "arm,endEffect,ctrl," - if(msg.lt >= 0.5): - ef_cmd += "-1,0,0" - elif(msg.rt >= 0.5): - ef_cmd += "1,0,0" - else: - ef_cmd += "0,0,0" - command = ef_cmd + "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - if(msg.lb):#If LB button: Manual control mode - #First, ensure control mode is set to manual on the MCU - self.ser.write(bytes("arm,setMode,manual\n", "utf8")) - #print(f"[Wrote] arm,setMode,manual", end="") - - command = "arm,man,0.15,"#Set manual control duty cycle statically to 15% for now - if(msg.d_left):#If D-Pad left set axis_1 to -1 - command += "-1," - elif(msg.d_right):#If D-Pad right set axis_0 to 1 - command += "1," - else: - command += "0," - - #Axis_1 - if(msg.ls_x < -0.5):#If LS left at least 50% set axis_1 to -1 - command += "-1," - elif(msg.ls_x > 0.5):#If LS right at least 50% set axis_1 to 1 - command += "1," - else: - command += "0," - - #Axis_2 - if(msg.ls_y < -0.5):#If LS up at least 50% set axis_2 to 1 - command += "1," - elif(msg.ls_y > 0.5):#If LS down at least 50% set axis_2 to -1 - command += "-1," - else: - command += "0," - - #Axis_3 - if(msg.rs_y < -0.5):#If RS up at least 50% set axis_3 to 1 - command += "1" - elif(msg.rs_y > 0.5):#If RS down at least 50% set axis_3 to -1 - command += "-1" - else: - command += "0" - - command += "\n" - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - - #self.mutex.release() - return - else:#Else normal (IK) control mode - #First, ensure control mode is set to IK on the MCU - self.ser.write(bytes("arm,setMode,ik\n", "utf8")) - #print(f"[Wrote] arm,setMode,ik", end="") - - command = "arm,ik," - if(msg.d_left):#If D-Pad left set axis zero to -1 - command += "-1," - elif(msg.d_right):#If D-Pad right set axis zero to 1 - command += "1," - else: - command += "0," - - #start with input from controller - coord_x = -1*msg.ls_y #out/in - coord_y = -1*msg.rs_y #up/down - - #convert units for ik coordinate offsets - coord_x = round(coord_x * 20.4 * 2, 1) #20.4mm (1 in.) per 1 unit of input. Times 2 for two inches - coord_y = round(coord_y * 20.4 * 2, 1) # - - command += f"{coord_x},{coord_y}\n" - - self.ser.write(bytes(command, "utf8")) - print(f"[Wrote] {command}", end="") - #self.mutex.release() - return - - - @staticmethod - def list_serial_ports(): - return glob.glob("/dev/tty[A-Za-z]*") - - -def myexcepthook(type, value, tb): - print("Uncaught exception:", type, value) - serial_pub.ser.close() - - -def main(args=None): - rclpy.init(args=args) - sys.excepthook = myexcepthook - - global serial_pub - serial_pub = SerialRelay() - serial_pub.run() - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/arm_pkg/setup.py b/src/arm_pkg/setup.py index a12a27f..8c8c3bd 100644 --- a/src/arm_pkg/setup.py +++ b/src/arm_pkg/setup.py @@ -16,11 +16,11 @@ setup( maintainer='tristan', maintainer_email='tristanmcginnis26@gmail.com', description='TODO: Package description', - license='TODO: License declaration', - tests_require=['pytest'], + license='All Rights Reserved', entry_points={ 'console_scripts': [ - 'arm = arm_pkg.arm_relay_node:main' + 'arm = arm_pkg.arm_node:main', + 'headless = arm_pkg.arm_headless:main' ], }, ) diff --git a/src/arm_pkg/test/test_copyright.py b/src/arm_pkg/test/test_copyright.py deleted file mode 100644 index 97a3919..0000000 --- a/src/arm_pkg/test/test_copyright.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -# Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' diff --git a/src/arm_pkg/test/test_flake8.py b/src/arm_pkg/test/test_flake8.py deleted file mode 100644 index 27ee107..0000000 --- a/src/arm_pkg/test/test_flake8.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_flake8.main import main_with_errors -import pytest - - -@pytest.mark.flake8 -@pytest.mark.linter -def test_flake8(): - rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) diff --git a/src/arm_pkg/test/test_pep257.py b/src/arm_pkg/test/test_pep257.py deleted file mode 100644 index b234a38..0000000 --- a/src/arm_pkg/test/test_pep257.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_pep257.main import main -import pytest - - -@pytest.mark.linter -@pytest.mark.pep257 -def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' From cc84afb709b3df4713f88253f2e31213119ea803 Mon Sep 17 00:00:00 2001 From: TristanMcGinnis Date: Tue, 18 Feb 2025 18:32:06 +0000 Subject: [PATCH 2/2] Finish merge --- .../headless_arm_pkg/__init__.py | 0 .../headless_arm_ctrl_node.py | 258 ------------------ src/headless_arm_pkg/package.xml | 21 -- .../resource/headless_arm_pkg | 0 src/headless_arm_pkg/setup.cfg | 4 - src/headless_arm_pkg/setup.py | 26 -- src/headless_arm_pkg/test/test_copyright.py | 25 -- src/headless_arm_pkg/test/test_flake8.py | 25 -- src/headless_arm_pkg/test/test_pep257.py | 23 -- 9 files changed, 382 deletions(-) delete mode 100644 src/headless_arm_pkg/headless_arm_pkg/__init__.py delete mode 100755 src/headless_arm_pkg/headless_arm_pkg/headless_arm_ctrl_node.py delete mode 100644 src/headless_arm_pkg/package.xml delete mode 100644 src/headless_arm_pkg/resource/headless_arm_pkg delete mode 100644 src/headless_arm_pkg/setup.cfg delete mode 100644 src/headless_arm_pkg/setup.py delete mode 100644 src/headless_arm_pkg/test/test_copyright.py delete mode 100644 src/headless_arm_pkg/test/test_flake8.py delete mode 100644 src/headless_arm_pkg/test/test_pep257.py diff --git a/src/headless_arm_pkg/headless_arm_pkg/__init__.py b/src/headless_arm_pkg/headless_arm_pkg/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/headless_arm_pkg/headless_arm_pkg/headless_arm_ctrl_node.py b/src/headless_arm_pkg/headless_arm_pkg/headless_arm_ctrl_node.py deleted file mode 100755 index c10a73d..0000000 --- a/src/headless_arm_pkg/headless_arm_pkg/headless_arm_ctrl_node.py +++ /dev/null @@ -1,258 +0,0 @@ -import rclpy -from rclpy.node import Node - -import pygame - -import time - -import serial -import sys -import threading -import glob -import os - -from std_msgs.msg import String -from ros2_interfaces_pkg.msg import ControllerState -from ros2_interfaces_pkg.msg import ArmManual -from ros2_interfaces_pkg.msg import ArmIK - - -os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() -os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display - -class Headless(Node): - def __init__(self): - # Initalize node with name - super().__init__("headless_arm_ctrl") - - # Depricated, kept temporarily for reference - # self.create_timer(0.20, self.send_controls)#read and send controls - - self.create_timer(0.20, self.send_manual) - - - # Create a publisher to publish any output the pico sends - - # Depricated, kept temporarily for reference - #self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10) - self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10) - - # Create a subscriber to listen to any commands sent for the pico - - # Depricated, kept temporarily for reference - #self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10) - - self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) - - - #self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change - - - # Initialize pygame - pygame.init() - - # Initialize the gamepad module - pygame.joystick.init() - - # Check if any gamepad is connected - if pygame.joystick.get_count() == 0: - print("No gamepad found.") - pygame.quit() - exit() - for event in pygame.event.get(): - if event.type == pygame.QUIT: - pygame.quit() - exit() - - # Initialize the first gamepad, print name to terminal - self.gamepad = pygame.joystick.Joystick(0) - self.gamepad.init() - print(f'Gamepad Found: {self.gamepad.get_name()}') - # - # - - - def run(self): - # This thread makes all the update processes run in the background - thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) - thread.start() - - - try: - while rclpy.ok(): - #Check the pico for updates - - self.read_feedback() - if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected - print(f"Gamepad disconnected: {self.gamepad.get_name()}") - - while pygame.joystick.get_count() == 0: - #self.send_controls() #depricated, kept for reference temporarily - self.send_manual() - self.read_feedback() - self.gamepad = pygame.joystick.Joystick(0) - self.gamepad.init() #re-initialized gamepad - print(f"Gamepad reconnected: {self.gamepad.get_name()}") - - - except KeyboardInterrupt: - sys.exit(0) - - - def send_manual(self): - for event in pygame.event.get(): - if event.type == pygame.QUIT: - pygame.quit() - exit() - input = ArmManual() - - dpad_input = self.gamepad.get_hat(0) - input.axis0 = 0 - if dpad_input[0] == 1: - input.axis0 = 1 - elif dpad_input[0] == -1: - input.axis0 = -1 - - input.axis1 = round(self.gamepad.get_axis(0))#left x-axis - input.axis2 = round(self.gamepad.get_axis(1))#left y-axis - input.axis3 = round(self.gamepad.get_axis(4))#right y-axis - - #Temporary, not controlling digit. Awaiting embedded implementation - input.effector_yaw = 0 - input.effector_roll = 0 - input.gripper = 0 - input.linear_actuator = 0 - input.laser = 0 - - - if pygame.joystick.get_count() != 0: - - self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n") - self.manual_pub.publish(input) - else: - pass - - pass - - # Depricated, kept temporarily for reference - # def send_controls(self): - - # for event in pygame.event.get(): - # if event.type == pygame.QUIT: - # pygame.quit() - # exit() - # input = ControllerState() - - # input.lt = self.gamepad.get_axis(2)#left trigger - # input.rt = self.gamepad.get_axis(5)#right trigger - - # #input.lb = self.gamepad.get_button(9)#Value must be converted to bool - # if(self.gamepad.get_button(4)):#left bumper - # input.lb = True - # else: - # input.lb = False - - # #input.rb = self.gamepad.get_button(10)#Value must be converted to bool - # if(self.gamepad.get_button(5)):#right bumper - # input.rb = True - # else: - # input.rb = False - - # #input.plus = self.gamepad.get_button(6)#plus button - # if(self.gamepad.get_button(7)):#plus button - # input.plus = True - # else: - # input.plus = False - - # #input.minus = self.gamepad.get_button(4)#minus button - # if(self.gamepad.get_button(6)):#minus button - # input.minus = True - # else: - # input.minus = False - - # input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis - # input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis - # input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis - # input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis - - # #input.a = self.gamepad.get_button(1)#A button - # if(self.gamepad.get_button(0)):#A button - # input.a = True - # else: - # input.a = False - # #input.b = self.gamepad.get_button(0)#B button - # if(self.gamepad.get_button(1)):#B button - # input.b = True - # else: - # input.b = False - # #input.x = self.gamepad.get_button(3)#X button - # if(self.gamepad.get_button(2)):#X button - # input.x = True - # else: - # input.x = False - # #input.y = self.gamepad.get_button(2)#Y button - # if(self.gamepad.get_button(3)):#Y button - # input.y = True - # else: - # input.y = False - - - # dpad_input = self.gamepad.get_hat(0)#D-pad input - - # #not using up/down on DPad - # input.d_up = False - # input.d_down = False - - - # if(dpad_input[0] == 1):#D-pad right - # input.d_right = True - # else: - # input.d_right = False - # if(dpad_input[0] == -1):#D-pad left - # input.d_left = True - # else: - # input.d_left = False - - - # if pygame.joystick.get_count() != 0: - - # self.get_logger().info(f"[Ctrl] Updated Controller State\n") - - # self.publisher.publish(input) - # else: - # pass - - - - - - def read_feedback(self, msg): - - # Create a string message object - #msg = String() - - # Set message data - #msg.data = output - - # Publish data - #self.publisher.publish(msg.data) - - print(f"[MCU] {msg.data}", end="") - #print(f"[Pico] Publishing: {msg}") - - - -def main(args=None): - rclpy.init(args=args) - - node = Headless() - - rclpy.spin(node) - rclpy.shutdown() - - #tb_bs = BaseStation() - #node.run() - - -if __name__ == '__main__': - main() diff --git a/src/headless_arm_pkg/package.xml b/src/headless_arm_pkg/package.xml deleted file mode 100644 index 2d5d18a..0000000 --- a/src/headless_arm_pkg/package.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - headless_arm_pkg - 1.0.0 - Package used for headless rover arm control with a controller - tristan - All Rights Reserved - - rclpy - ros2_interfaces_pkg - - ament_copyright - ament_flake8 - ament_pep257 - python3-pytest - - - ament_python - - diff --git a/src/headless_arm_pkg/resource/headless_arm_pkg b/src/headless_arm_pkg/resource/headless_arm_pkg deleted file mode 100644 index e69de29..0000000 diff --git a/src/headless_arm_pkg/setup.cfg b/src/headless_arm_pkg/setup.cfg deleted file mode 100644 index 775bd15..0000000 --- a/src/headless_arm_pkg/setup.cfg +++ /dev/null @@ -1,4 +0,0 @@ -[develop] -script_dir=$base/lib/headless_arm_pkg -[install] -install_scripts=$base/lib/headless_arm_pkg diff --git a/src/headless_arm_pkg/setup.py b/src/headless_arm_pkg/setup.py deleted file mode 100644 index b692426..0000000 --- a/src/headless_arm_pkg/setup.py +++ /dev/null @@ -1,26 +0,0 @@ -from setuptools import find_packages, setup - -package_name = 'headless_arm_pkg' - -setup( - name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), - data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']), - ], - install_requires=['setuptools'], - zip_safe=True, - maintainer='tristan', - maintainer_email='tristanmcginnis26@gmail.com', - description='Package used for headless rover arm control with a controller', - license='All Rights Reserved', - tests_require=['pytest'], - entry_points={ - 'console_scripts': [ - 'headless = headless_arm_pkg.headless_arm_ctrl_node:main' - ], - }, -) diff --git a/src/headless_arm_pkg/test/test_copyright.py b/src/headless_arm_pkg/test/test_copyright.py deleted file mode 100644 index 97a3919..0000000 --- a/src/headless_arm_pkg/test/test_copyright.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -# Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' diff --git a/src/headless_arm_pkg/test/test_flake8.py b/src/headless_arm_pkg/test/test_flake8.py deleted file mode 100644 index 27ee107..0000000 --- a/src/headless_arm_pkg/test/test_flake8.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_flake8.main import main_with_errors -import pytest - - -@pytest.mark.flake8 -@pytest.mark.linter -def test_flake8(): - rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) diff --git a/src/headless_arm_pkg/test/test_pep257.py b/src/headless_arm_pkg/test/test_pep257.py deleted file mode 100644 index b234a38..0000000 --- a/src/headless_arm_pkg/test/test_pep257.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_pep257.main import main -import pytest - - -@pytest.mark.linter -@pytest.mark.pep257 -def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings'