Initial commit, add all current packages

This commit is contained in:
Tristan McGinnis
2025-02-18 11:52:25 -06:00
parent 9d04816994
commit eb466e11b7
63 changed files with 2313 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
#Colcon-created build folders
build/
install/
log/
#Mac OS system files
*.DS_Store
#VSCode settings
.vscode/

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "src/ros2_interfaces_pkg"]
path = src/ros2_interfaces_pkg
url = git@github.com:SHC-ASTRA/ros2_interfaces_pkg.git

View File

@@ -1,2 +1,83 @@
# rover-ros2 # rover-ros2
Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation. You can use launch files located here to run any module as standalone or as a full-rover relay. Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation. You can use launch files located here to run any module as standalone or as a full-rover relay.
## Software Pre-reqs
The BaseStation computer will need several things.
- ROS2 Humble
- Follow the standard ROS2 humble install process. Linux recommended.
- https://docs.ros.org/en/humble/Installation.html
- Colcon
- `$ sudo apt update`
- `$ sudo apt install python3-colcon-common-extensions`
- Configured Static IP for Ubiquiti bullet
- This process will depend on the system, but you'll need this for using the Ubiquiti network with the M2 bullets.
- Settings:
- Net Mask: 255.255.255.0
- IP Address: 192.168.1.x
- This can be just about anything, just not ending in 1.20, 1.21, 1.69, or 1.0
- Gateway: 192.168.1.0
## Headless Control
To control the rover without a base station you'll need to run the headless control node.
- SSH to the the rover from a computer computer over wifi (or connect directly)
- `ssh clucky@192.168.1.69`
- Password: spaceiscool639
- Run `Ros2 run headless_pkg headless`
- This needs to be done in a separate ssh/shell window from the core rover node
## Running the Rover node
- Ensure you're either connected directly to the rover's network switch over ethernet or connected to an M2 bullet which has connection to the rover (red signal lights on)
- SSH to the the rover from a computer computer over wifi (or connect directly)
- `ssh clucky@192.168.1.69`
- Password: spaceiscool639
- run `ros2 run core_control_pkg core_control`
- This needs to be done in a separate ssh/shell window from the headless control node (if you're running it)
- You should see "MCU found" on a serial line
- If not, you'll get an error it's not found and the program will close. Ensure the teensy is connected via usb to the Nuc
## Connecting the GuliKit Controller (Recommended)
- Connect controller to pc with USB-C
- Select the "X-Input" control mode (Windows logo) on the controller.
- Hold the button next to the symbols (windows, android, switch, etc...)
- You'll need to release the button and press down again to cycle to the next mode

View File

View File

@@ -0,0 +1,275 @@
import rclpy
from rclpy.node import Node
import serial
import sys
import threading
import glob
import time
import atexit
import signal
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import ArmManual
from ros2_interfaces_pkg.msg import ArmIK
from ros2_interfaces_pkg.msg import SocketFeedback
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initialize node with name
super().__init__("arm_relay")
# Create publishers
#Depricated, temporary for reference
# self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)
# self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)
# self.faerie_publisher = self.create_publisher(FaerieTelemetry, '/astra/arm/bio/feedback', 10)
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
# Create subscribers
#Depricated, temporary for reference
#self.control_subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10)
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
#self.command_subscriber = self.create_subscription(String, '/astra/arm/command', self.send_command, 10)
#self.faerie_subscriber = self.create_subscription(String, "/astra/arm/bio/control", self.send_faerie_controls, 10)
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
print(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
print("Unable to find MCU... please make sure it is connected.")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
try:
while rclpy.ok():
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.cleanup()
#Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
print(f"[MCU] {output}", end="")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
print("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_manual(self, msg):
axis0 = msg.axis0
axis1 = msg.axis1
axis2 = msg.axis2
axis3 = msg.axis3
#Send controls for arm
command = "can_relay_tovic,arm,40," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll
# ef_yaw = msg.effector_yaw
# gripper = msg.gripper
# actuator = msg.linear_actuator
# laser = msg.laser
# #Send controls for digit
# command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
return
# Depricated functions, kept temporarily for reference
# def send_controls(self, msg):
# command = ""
# ef_cmd = ""
# if(msg.b):
# command = "arm,stop\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
# if(msg.a):
# command = "arm,endEffect,act,0"#retract actuator out
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# elif(msg.x):
# command = "arm,endEffect,act,1"#extend actuator in
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.plus):
# command = "arm,endEffect,laser,1\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# elif(msg.minus):
# command = "arm,endEffect,laser,0\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.rb):
# ef_cmd = "arm,endEffect,ctrl,"
# if(msg.lt >= 0.5):
# ef_cmd += "-1,"
# elif(msg.rt >= 0.5):
# ef_cmd += "1,"
# else:
# ef_cmd += "0,"
# if(msg.rs_x < 0):
# ef_cmd += "-1,"
# elif(msg.rs_x > 0):
# ef_cmd += "1,"
# else:
# ef_cmd += "0,"
# if(msg.ls_x < 0):
# ef_cmd += "-1"
# elif(msg.ls_x > 0):
# ef_cmd += "1"
# else:
# ef_cmd += "0"
# command = ef_cmd + "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
# else:
# ef_cmd = "arm,endEffect,ctrl,"
# if(msg.lt >= 0.5):
# ef_cmd += "-1,0,0"
# elif(msg.rt >= 0.5):
# ef_cmd += "1,0,0"
# else:
# ef_cmd += "0,0,0"
# command = ef_cmd + "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.lb):
# #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
# command = "arm,man,0.25,"
# else:
# #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
# command = "arm,man,0.15,"
# if(msg.d_left):
# command += "-1,"
# elif(msg.d_right):
# command += "1,"
# else:
# command += "0,"
# if(msg.ls_x < -0.4):
# command += "1,"
# elif(msg.ls_x > 0.4):
# command += "-1,"
# else:
# command += "0,"
# if(msg.ls_y < -0.4):
# command += "1,"
# elif(msg.ls_y > 0.4):
# command += "-1,"
# else:
# command += "0,"
# if(msg.rs_y < -0.4):
# command += "1"
# elif(msg.rs_y > 0.4):
# command += "-1"
# else:
# command += "0"
# command += "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
#return glob.glob("/dev/tty[A-Za-z]*")
def cleanup(self):
print("Cleaning up...")
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -0,0 +1,259 @@
import rclpy
from rclpy.node import Node
import serial
import sys
import threading
import glob
import time
from std_msgs.msg import String
from interfaces_pkg.msg import ControllerState
from interfaces_pkg.msg import ArmState
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("arm_relay")
# Create a publisher to publish any output the mcu sends
self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)#Random String feedback data
self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)#Standard state feedback data
# Create a subscriber to listen to any commands sent for the mcu
self.subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10)
# Loop through all serial devices on the computer to check for the mcu
self.port = None
ports = SerialRelay.list_serial_ports()
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, timeout=1)
ser.write(b"arm,ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the mcu
if b"pong" in response:
self.port = port
print(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is None:
print("Unable to find MCU... please make sure it is connected.")
#sys.exit(1) TEMPORARY TEST, UNCOMMENT THIS LINE IN THE FUTURE
self.ser = serial.Serial(self.port, 115200)
#self.mutex = threading.Lock()
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon = True)
thread.start()
try:
while rclpy.ok():
# Check the mcu for updates
#rclpy.spin_once(self, timeout_sec=0.1)
#self.mutex.acquire()
if self.ser.in_waiting:
#self.mutex.release()
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt:
#self.mutex.release()
serial_pub.ser.close()
sys.exit(0)
def read_mcu(self):
try:
#self.mutex.acquire()
output = str(self.ser.readline(), "utf8")
if output:
print(f"[MCU] {output}", end="")
# Create a string message object
msg = String()
# Set message data
msg.data = output
# Publish data
self.output_publisher.publish(msg)
#print(f"[MCU] Publishing: {msg}")
except serial.SerialException:
#self.mutex.release()
pass
#finally:
#self.mutex.release()
def send_controls(self, msg):
command = ""
ef_cmd = "" #end effector command to be apended
#self.mutex.acquire()
if(msg.b):#If B button: send ESTOP command
command = "arm,stop\n"
self.ser.write(bytes(command, "utf8"))#Send command to MCU
print(f"[Wrote] {command}", end="")#Echo command to console
#self.mutex.release()#Release mutex lock
return
if(msg.plus):#Turn EF laser on
command = "arm,endEffect,laser,1\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
elif(msg.minus):#Turn EF laser off
command = "arm,endEffect,laser,0\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
if(msg.rb):#If RB button: End Effector control mode
ef_cmd = "arm,endEffect,ctrl,"
if(msg.lt >= 0.5):#If LT button: Open end effector
ef_cmd += "-1,"
elif(msg.rt >= 0.5):#If RT button: Close end effector
ef_cmd += "1,"
else:
ef_cmd += "0,"
if(msg.rs_x < 0): #Check left/stop/right for tilt movement
ef_cmd += "-1,"
elif(msg.rs_x > 0):
ef_cmd += "1,"
else:
ef_cmd += "0,"
pass
if(msg.ls_x < 0): #Check left/stop/right for revolve movement
ef_cmd += "-1"
elif(msg.ls_x > 0):
ef_cmd += "1"
else:
ef_cmd += "0"
command = ef_cmd + "\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
#self.mutex.release()
return
else:
ef_cmd = "arm,endEffect,ctrl,"
if(msg.lt >= 0.5):
ef_cmd += "-1,0,0"
elif(msg.rt >= 0.5):
ef_cmd += "1,0,0"
else:
ef_cmd += "0,0,0"
command = ef_cmd + "\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
if(msg.lb):#If LB button: Manual control mode
#First, ensure control mode is set to manual on the MCU
self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
#print(f"[Wrote] arm,setMode,manual", end="")
command = "arm,man,0.15,"#Set manual control duty cycle statically to 15% for now
if(msg.d_left):#If D-Pad left set axis_1 to -1
command += "-1,"
elif(msg.d_right):#If D-Pad right set axis_0 to 1
command += "1,"
else:
command += "0,"
#Axis_1
if(msg.ls_x < -0.5):#If LS left at least 50% set axis_1 to -1
command += "-1,"
elif(msg.ls_x > 0.5):#If LS right at least 50% set axis_1 to 1
command += "1,"
else:
command += "0,"
#Axis_2
if(msg.ls_y < -0.5):#If LS up at least 50% set axis_2 to 1
command += "1,"
elif(msg.ls_y > 0.5):#If LS down at least 50% set axis_2 to -1
command += "-1,"
else:
command += "0,"
#Axis_3
if(msg.rs_y < -0.5):#If RS up at least 50% set axis_3 to 1
command += "1"
elif(msg.rs_y > 0.5):#If RS down at least 50% set axis_3 to -1
command += "-1"
else:
command += "0"
command += "\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
#self.mutex.release()
return
else:#Else normal (IK) control mode
#First, ensure control mode is set to IK on the MCU
self.ser.write(bytes("arm,setMode,ik\n", "utf8"))
#print(f"[Wrote] arm,setMode,ik", end="")
command = "arm,ik,"
if(msg.d_left):#If D-Pad left set axis zero to -1
command += "-1,"
elif(msg.d_right):#If D-Pad right set axis zero to 1
command += "1,"
else:
command += "0,"
#start with input from controller
coord_x = -1*msg.ls_y #out/in
coord_y = -1*msg.rs_y #up/down
#convert units for ik coordinate offsets
coord_x = round(coord_x * 20.4 * 2, 1) #20.4mm (1 in.) per 1 unit of input. Times 2 for two inches
coord_y = round(coord_y * 20.4 * 2, 1) #
command += f"{coord_x},{coord_y}\n"
self.ser.write(bytes(command, "utf8"))
print(f"[Wrote] {command}", end="")
#self.mutex.release()
return
@staticmethod
def list_serial_ports():
return glob.glob("/dev/tty[A-Za-z]*")
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
serial_pub.ser.close()
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == '__main__':
main()

21
src/arm_pkg/package.xml Normal file
View File

@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>arm_pkg</name>
<version>1.0.0</version>
<description>Core arm package which handles ROS2 commnuication.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>All Rights Reserved</license>
<depend>rclpy</depend>
<depend>ros2_interfaces_pkg</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

4
src/arm_pkg/setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/arm_pkg
[install]
install_scripts=$base/lib/arm_pkg

26
src/arm_pkg/setup.py Normal file
View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'arm_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'arm = arm_pkg.arm_relay_node:main'
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

View File

@@ -0,0 +1,198 @@
import rclpy
from rclpy.node import Node
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import sys
import threading
import glob
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreFeedback
from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("serial_publisher")#previously 'serial_publisher'
# Create publishers for feedback and telemetry
self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
# Create a subscriber to listen to any commands sent for the MCU
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
# Create a service server for pinging the rover
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
print(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
print("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
print(f"[MCU] {output}", end="")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
# Temporary
# packet = output.strip().split(',')
# if len(packet) >= 2 and packet[0] == "core" and packet[1] == "telemetry":
# feedback = CoreFeedback()
# feedback.gpslat = float(packet[2])
# feedback.gpslon = float(packet[3])
# feedback.gpssat = float(packet[4])
# feedback.bnogyr.x = float(packet[5])
# feedback.bnogyr.y = float(packet[6])
# feedback.bnogyr.z = float(packet[7])
# feedback.bnoacc.x = float(packet[8])
# feedback.bnoacc.y = float(packet[9])
# feedback.bnoacc.z = float(packet[10])
# feedback.orient = float(packet[11])
# feedback.bmptemp = float(packet[12])
# feedback.bmppres = float(packet[13])
# feedback.bmpalt = float(packet[14])
# self.telemetry_publisher.publish(feedback)
# else:
# # print(f"[MCU] {output}", end="")
# # msg = String()
# # msg.data = output
# # self.debug_pub.publish(msg)
# return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
def scale_duty(self, value, max_speed):
leftMin = -1
leftMax = 1
rightMin = -max_speed/100.0
rightMax = max_speed/100.0
# Figure out how 'wide' each range is
leftSpan = leftMax - leftMin
rightSpan = rightMax - rightMin
# Convert the left range into a 0-1 range (float)
valueScaled = float(value - leftMin) / float(leftSpan)
# Convert the 0-1 range into a value in the right range.
return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg):
#can_relay_tovic,core,19, left_stick, right_stick
left_stick_neg = msg.left_stick * -1
command = "can_relay_tovic,core,19," + self.scale_duty(left_stick_neg, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
#print(f"[Sys] {command}", end="")
self.ser.write(bytes(command, "utf8"))# Send command to MCU
self.get_logger().debug(f"wrote: {command}")
#print(f"[Sys] Relaying: {command}")
def ping_callback(self, request, response):
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

21
src/core_pkg/package.xml Normal file
View File

@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>core_pkg</name>
<version>1.0.0</version>
<description>Core rover control package to handle command interpretation and embedded interfacing.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>All Rights Reserved</license>
<depend>rclpy</depend>
<depend>ros2_interfaces_pkg</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

4
src/core_pkg/setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/core_pkg
[install]
install_scripts=$base/lib/core_pkg

26
src/core_pkg/setup.py Normal file
View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'core_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='Core rover control package to handle command interpretation and embedded interfacing.',
license='All Rights Reserved',
tests_require=['pytest'],
entry_points={
'console_scripts': [
"core_control = core_pkg.core_control_node:main"
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@@ -0,0 +1,258 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import ControllerState
from ros2_interfaces_pkg.msg import ArmManual
from ros2_interfaces_pkg.msg import ArmIK
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("headless_arm_ctrl")
# Depricated, kept temporarily for reference
# self.create_timer(0.20, self.send_controls)#read and send controls
self.create_timer(0.20, self.send_manual)
# Create a publisher to publish any output the pico sends
# Depricated, kept temporarily for reference
#self.publisher = self.create_publisher(ControllerState, '/astra/arm/control', 10)
self.manual_pub = self.create_publisher(ArmManual, '/arm/control/manual', 10)
# Create a subscriber to listen to any commands sent for the pico
# Depricated, kept temporarily for reference
#self.subscriber = self.create_subscription(String, '/astra/arm/feedback', self.read_feedback, 10)
self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
#self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}')
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
#Check the pico for updates
self.read_feedback()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
#self.send_controls() #depricated, kept for reference temporarily
self.send_manual()
self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_manual(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = ArmManual()
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
input.axis1 = round(self.gamepad.get_axis(0))#left x-axis
input.axis2 = round(self.gamepad.get_axis(1))#left y-axis
input.axis3 = round(self.gamepad.get_axis(4))#right y-axis
#Temporary, not controlling digit. Awaiting embedded implementation
input.effector_yaw = 0
input.effector_roll = 0
input.gripper = 0
input.linear_actuator = 0
input.laser = 0
if pygame.joystick.get_count() != 0:
self.get_logger().info(f"[Ctrl] {input.axis0}, {input.axis1}, {input.axis2}, {input.axis3}\n")
self.manual_pub.publish(input)
else:
pass
pass
# Depricated, kept temporarily for reference
# def send_controls(self):
# for event in pygame.event.get():
# if event.type == pygame.QUIT:
# pygame.quit()
# exit()
# input = ControllerState()
# input.lt = self.gamepad.get_axis(2)#left trigger
# input.rt = self.gamepad.get_axis(5)#right trigger
# #input.lb = self.gamepad.get_button(9)#Value must be converted to bool
# if(self.gamepad.get_button(4)):#left bumper
# input.lb = True
# else:
# input.lb = False
# #input.rb = self.gamepad.get_button(10)#Value must be converted to bool
# if(self.gamepad.get_button(5)):#right bumper
# input.rb = True
# else:
# input.rb = False
# #input.plus = self.gamepad.get_button(6)#plus button
# if(self.gamepad.get_button(7)):#plus button
# input.plus = True
# else:
# input.plus = False
# #input.minus = self.gamepad.get_button(4)#minus button
# if(self.gamepad.get_button(6)):#minus button
# input.minus = True
# else:
# input.minus = False
# input.ls_x = round(self.gamepad.get_axis(0),2)#left x-axis
# input.ls_y = round(self.gamepad.get_axis(1),2)#left y-axis
# input.rs_x = round(self.gamepad.get_axis(3),2)#right x-axis
# input.rs_y = round(self.gamepad.get_axis(4),2)#right y-axis
# #input.a = self.gamepad.get_button(1)#A button
# if(self.gamepad.get_button(0)):#A button
# input.a = True
# else:
# input.a = False
# #input.b = self.gamepad.get_button(0)#B button
# if(self.gamepad.get_button(1)):#B button
# input.b = True
# else:
# input.b = False
# #input.x = self.gamepad.get_button(3)#X button
# if(self.gamepad.get_button(2)):#X button
# input.x = True
# else:
# input.x = False
# #input.y = self.gamepad.get_button(2)#Y button
# if(self.gamepad.get_button(3)):#Y button
# input.y = True
# else:
# input.y = False
# dpad_input = self.gamepad.get_hat(0)#D-pad input
# #not using up/down on DPad
# input.d_up = False
# input.d_down = False
# if(dpad_input[0] == 1):#D-pad right
# input.d_right = True
# else:
# input.d_right = False
# if(dpad_input[0] == -1):#D-pad left
# input.d_left = True
# else:
# input.d_left = False
# if pygame.joystick.get_count() != 0:
# self.get_logger().info(f"[Ctrl] Updated Controller State\n")
# self.publisher.publish(input)
# else:
# pass
def read_feedback(self, msg):
# Create a string message object
#msg = String()
# Set message data
#msg.data = output
# Publish data
#self.publisher.publish(msg.data)
print(f"[MCU] {msg.data}", end="")
#print(f"[Pico] Publishing: {msg}")
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>headless_arm_pkg</name>
<version>1.0.0</version>
<description>Package used for headless rover arm control with a controller</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>All Rights Reserved</license>
<depend>rclpy</depend>
<depend>ros2_interfaces_pkg</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/headless_arm_pkg
[install]
install_scripts=$base/lib/headless_arm_pkg

View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'headless_arm_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='Package used for headless rover arm control with a controller',
license='All Rights Reserved',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'headless = headless_arm_pkg.headless_arm_ctrl_node:main'
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@@ -0,0 +1,132 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
import os
import importlib
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init()
max_speed = 75 #Max speed as a duty cycle percentage (1-100)
class Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("headless_ctrl")
self.create_timer(0.20, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(CoreControl, '/core/control', 10)
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}')
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
#Check the pico for updates
self.send_controls()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
input = CoreControl()
input.max_speed = max_speed
input.right_stick = round(self.gamepad.get_axis(4),2)#right y-axis
if self.gamepad.get_axis(5) > 0:
input.left_stick = input.right_stick
else:
input.left_stick = round(self.gamepad.get_axis(1),2)#lext y-axis
if pygame.joystick.get_count() != 0:
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' #stop the rover if there is no controller connected
self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input)
else:
input.left_stick = 0
input.right_stick = 0
input.max_speed = 0
self.get_logger().info(f"[Ctrl] Stopping. No Gamepad Connected. ")
self.publisher.publish(input)
def main(args=None):
rclpy.init(args=args)
node = Headless()
rclpy.spin(node)
rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>headless_pkg</name>
<version>1.0.0</version>
<description>Package used for headless rover control with a controller</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>All Rights Reserved</license>
<depend>rclpy</depend>
<depend>ros2_interfaces_pkg</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/headless_pkg
[install]
install_scripts=$base/lib/headless_pkg

26
src/headless_pkg/setup.py Normal file
View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'headless_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='Package used for headless rover control with a controller',
license='All Rights Reserved',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'headless = headless_pkg.headless_ctrl_node:main'
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@@ -0,0 +1,157 @@
import rclpy
from rclpy.node import Node
import pygame
import time
import serial
import sys
import threading
import glob
from std_msgs.msg import String
class Mac_Headless(Node):
def __init__(self):
# Initalize node with name
super().__init__("mac_headless_ctrl")
self.create_timer(0.10, self.send_controls)
# Create a publisher to publish any output the pico sends
self.publisher = self.create_publisher(String, '/astra/core/control', 10)
# Create a subscriber to listen to any commands sent for the pico
self.subscriber = self.create_subscription(String, '/astra/core/feedback', self.read_feedback, 10)
#self.subscriber
self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
# Initialize pygame
pygame.init()
# Initialize the gamepad module
pygame.joystick.init()
# Check if any gamepad is connected
if pygame.joystick.get_count() == 0:
print("No gamepad found.")
pygame.quit()
exit()
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
# Initialize the first gamepad, print name to terminal
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}')
#
#
def run(self):
# This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
#Check the pico for updates
self.send_controls()
self.read_feedback()
if pygame.joystick.get_count() == 0: #if controller disconnected, wait for it to be reconnected
print(f"Gamepad disconnected: {self.gamepad.get_name()}")
while pygame.joystick.get_count() == 0:
self.send_controls()
self.read_feedback()
self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() #re-initialized gamepad
print(f"Gamepad reconnected: {self.gamepad.get_name()}")
except KeyboardInterrupt:
sys.exit(0)
def send_controls(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit()
#left_x = self.gamepad.get_axis(0)#left x-axis
left_y = self.gamepad.get_axis(1)#lext y-axis
#left_t = self.gamepad.get_axis(2)#left trigger
#right_x = self.gamepad.get_axis(3)#right x-axis
right_y = self.gamepad.get_axis(3)#right y-axis
right_t = self.gamepad.get_axis(4)#right trigger
if pygame.joystick.get_count() != 0:
if right_t > 0:#single-stick control mode
output = f'ctrl,{round(right_y,2)},{round(right_y,2)}'
else:
output = f'ctrl,{round(left_y,2)},{round(right_y,2)}'
else:
output = 'ctrl,0,0' #stop the rover if there is no controller connected
#print(f"[Controls] {output}", end="")
self.get_logger().info(f"[Ctrl] {output}")
# Create a string message object
msg = String()
# Set message data
msg.data = output
#only publish commands when the values are updated
if self.lastMsg != msg:
# Publish data
self.publisher.publish(msg)
self.lastMsg = msg
#print(f"[Pico] Publishing: {msg}")
def read_feedback(self, msg):
# Create a string message object
#msg = String()
# Set message data
#msg.data = output
# Publish data
#self.publisher.publish(msg.data)
print(f"[MCU] {msg.data}", end="")
#print(f"[Pico] Publishing: {msg}")
def main(args=None):
rclpy.init(args=args)
node = Mac_Headless()
rclpy.spin(node)
rclpy.shutdown()
#tb_bs = BaseStation()
#node.run()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>mac_headless_pkg</name>
<version>1.0.0</version>
<description>(MAC/VM Version) Package used for headless rover control with a controller.</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>All Rights Reserved</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/mac_headless_pkg
[install]
install_scripts=$base/lib/mac_headless_pkg

View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'mac_headless_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='Package used for headless rover control with a controller',
license='All Rights Reserved',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'mac_headless = mac_headless_pkg.mac_headless_ctrl_node:main'
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

3
src/ros2_interfaces_pkg/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
build
install
log

View File

@@ -0,0 +1,43 @@
cmake_minimum_required(VERSION 3.8)
project(ros2_interfaces_pkg)
if(POLICY CMP0148)
cmake_policy(SET CMP0148 OLD)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/AutoCommand.action"
"msg/control/ArmIK.msg"
"msg/control/ArmManual.msg"
"msg/control/ControllerState.msg"
"msg/control/CoreControl.msg"
"msg/feedback/AutoFeedback.msg"
"msg/feedback/SocketFeedback.msg"
"msg/feedback/DigitFeedback.msg"
"msg/feedback/FaerieFeedback.msg"
"msg/feedback/CoreFeedback.msg"
DEPENDENCIES std_msgs geometry_msgs
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@@ -0,0 +1,3 @@
# rover-Interfaces
Standard repo for all ROS2 Interface files.
Inteded to be a git submodule within the other ros2-related repos.

View File

@@ -0,0 +1,13 @@
# Goal
int64 mission_type
float64 gps_lat_target
float64 gps_long_target
float64 target_radius
float64 period
---
# Result
int64 final_result
---
# Feeedback
int64 current_status
float64 distance_remaining

View File

@@ -0,0 +1,33 @@
# Topic: /arm/control/ik
# User's input on x, y, z axis described as a vector
geometry_msgs/Vector3 movement_vector # [x, y, z] values for the arm's movement in 3D space
# Gripper control
# -1: Closing
# 0: Stopped
# +1: Opening
int32 gripper
# Linear Actuator movement direction
# -1: Retracting
# 0: Stopped
# +1: Extending
int32 linear_actuator
# Laser state
# 0: Off
# 1: On
int32 laser
# Effector Roll movement direction
# -1: Counterclockwise
# 0: No movement
# +1: Clockwise
int32 effector_roll
# Effector Yaw movement direction
# -1: Counterclockwise
# 0: No movement
# +1: Clockwise
int32 effector_yaw

View File

@@ -0,0 +1,44 @@
# Topic: /arm/control/manual
# Axis control for rotation and speed multiplier
# Direction of rotation:
# -1: Counterclockwise
# 0: No movement
# +1: Clockwise
# Speed multiplier:
# -2: 2x speed CCW
# 0: No movement
# +2: 2x speed CW
int32 axis0 # Axis 0 control
int32 axis1 # Axis 1 control
int32 axis2 # Axis 2 control
int32 axis3 # Axis 3 control
# Effector Roll movement direction
# -1: Counterclockwise
# 0: No movement
# +1: Clockwise
int32 effector_roll
# Effector Yaw movement direction
# -1: Counterclockwise
# 0: No movement
# +1: Clockwise
int32 effector_yaw
# Gripper control
# -1: Closing
# 0: Stopped
# +1: Opening
int32 gripper
# Linear Actuator movement direction
# -1: Retracting
# 0: Stopped
# +1: Extending
int32 linear_actuator
# Laser state
# 0: Off
# 1: On
int32 laser

View File

@@ -0,0 +1,37 @@
# Controller state message to transmit the entire state of the controller
# This is depricated and currently no longer in use. It remains as a backup in case it's needed.
#Triggers
float64 lt
float64 rt
#Bumbers
bool lb
bool rb
#Menu/Pause buttons (plus/minus on the Gulikit)
bool plus
bool minus
#Left stick values
float64 ls_x
float64 ls_y
#Right stick values
float64 rs_x
float64 rs_y
#Buttons
bool a
bool b
bool x
bool y
#D-pad
bool d_up
bool d_down
bool d_left
bool d_right
#Home button
bool home

View File

@@ -0,0 +1,12 @@
# Topic: /core/control
# Left and right stick control
# Percentage of max speed for each stick (range: 0.0 to 1.0)
float32 left_stick # Left stick percentage of max speed
float32 right_stick # Right stick percentage of max speed
# Maximum speed setting
int32 max_speed # Maximum duty cycle (range: 0 to 100)
# Brake mode
bool brake # Is brake mode enabled?

View File

@@ -0,0 +1,20 @@
# Current mission type
int32 mission_type
# Target Latitude
float64 target_lat
# Target Longitude
float64 target_long
# Estimated Distance remaining
float32 distance
# Update
string update
# Current job
string current
# Warning
string warn

View File

@@ -0,0 +1,24 @@
# Topic: /core/feedback
# GPS Data
float64 gps_lat # GPS latitude
float64 gps_long # GPS longitude
int32 gps_sats # Number of active GPS satellites
# BNO055 Sensor Data
geometry_msgs/Vector3 bno_gyro # x, y, z gyroscope readings from BNO055
geometry_msgs/Vector3 bno_accel # x, y, z acceleration readings from BNO055
# Rover Orientation
float64 orientation # Orientation of the rover relative to North (0 - 359 degrees)
# BMP Sensor Data
float64 bmp_temp # Temperature reading in Celsius
float64 bmp_alt # Altitude reading in meters
float64 bmp_pres # Pressure reading in millibars (might be hPa)
# Voltage Readings
float64 bat_voltage # Current voltage of the battery
float64 voltage_12 # Current voltage of the 12V rail
float64 voltage_5 # Current voltage of the 5V rail
float64 voltage_3 # Current voltage of the 3.3V rail

View File

@@ -0,0 +1,13 @@
# Topic: /arm/feedback/digit
# Current wrist yaw angle
float32 wrist_angle
# Current voltage of the battery
float32 bat_voltage
# Current voltage of the 12V rail
float32 voltage_12
# Current voltage of the 5V rail
float32 voltage_5

View File

@@ -0,0 +1,20 @@
# Topic: /arm/feedback/faerie
# Voltage readings
float32 bat_voltage # Current voltage of the battery
float32 voltage_12 # Current voltage of the 12V rail
float32 voltage_5 # Current voltage of the 5V rail
# SHT sensor data
float32 sht_temp # Temperature in Celsius from the SHT sensor on FAERIE
float32 sht_humidity # Humidity in percentage from the SHT sensor on FAERIE
# Lux sensor readings
# (placeholders, these will eventually each associate with a specific color)
float32 lux_1 # Lux reading 1
float32 lux_2 # Lux reading 2
float32 lux_3 # Lux reading 3
float32 lux_4 # Lux reading 4
float32 lux_5 # Lux reading 5
float32 lux_6 # Lux reading 6
float32 lux_7 # Lux reading 7

View File

@@ -0,0 +1,31 @@
# Topic: /arm/feedback/socket
# Axis 0 feedback
float32 axis0_angle # Current joint angle in degrees
float32 axis0_temp # Current motor temperature in Celsius
float32 axis0_voltage # Current motor voltage in volts
float32 axis0_current # Current motor amperage in milliamps
# Axis 1 feedback
float32 axis1_angle # Current joint angle in degrees
float32 axis1_temp # Current motor temperature in Celsius
float32 axis1_voltage # Current motor voltage in volts
float32 axis1_current # Current motor amperage in milliamps
# Axis 2 feedback
float32 axis2_angle # Current joint angle in degrees
float32 axis2_temp # Current motor temperature in Celsius
float32 axis2_voltage # Current motor voltage in volts
float32 axis2_current # Current motor amperage in milliamps
# Axis 3 feedback
float32 axis3_angle # Current joint angle in degrees
float32 axis3_temp # Current motor temperature in Celsius
float32 axis3_voltage # Current motor voltage in volts
float32 axis3_current # Current motor amperage in milliamps
# Voltage feedback
float32 bat_voltage # Current voltage of the battery
float32 voltage_12 # Current voltage of the 12V rail
float32 voltage_5 # Current voltage of the 5V rail
float32 voltage_3 # Current voltage of the 3.3V rail

View File

@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ros2_interfaces_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>gemoetry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>