Merge pull request #3 from SHC-ASTRA/anchor_dev

Anchor: Functional and Tested
This commit is contained in:
Tristan McGinnis
2025-02-24 14:41:12 -06:00
committed by GitHub
12 changed files with 574 additions and 272 deletions

3
.gitignore vendored
View File

@@ -8,3 +8,6 @@ log/
#VSCode settings #VSCode settings
.vscode/ .vscode/
#Pycache folder
__pycache__/

140
README.md
View File

@@ -1,83 +1,87 @@
# rover-ros2 # rover-ros2
Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation. You can use launch files located here to run any module as standalone or as a full-rover relay.
Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation.
You will use this package to launch any module-side ROS2 nodes.
<br>
## Software Pre-reqs ## Software Pre-reqs
An acting base station computer will need several things:
The BaseStation computer will need several things. * ROS2 Humble
* Follow the standard ROS2 humble install process. Linux recommended.
* https://docs.ros.org/en/humble/Installation.html
* Colcon
* `$ sudo apt update`
* `$ sudo apt install python3-colcon-common-extensions`
* Configured Static IP for Ubiquiti bullet (Process varies by OS)
* IP Address: 192.168.1.x
* This can be just about anything not already in use. I recommend something 30-39
* Net Mask: 255.255.255.0
* Gateway: 192.168.1.0
- ROS2 Humble ## Launching with ANCHOR
- Follow the standard ROS2 humble install process. Linux recommended. ANCHOR (Active Node Controller Hub and Operational Relay)
Allows for launching all nodes on the rover simulataneously. Additionally, all controls will run through the core's NUC and MCU.
<br>
1. SSH to core
* Core1: `ssh clucky@192.168.1.69`
* Core2: `ssh clucky@192.168.1.70`
* Password: \<can be found in the rover-Docs repo under documentation>
2. Navigate to rover-ros2 workspace
* `cd rover-ros2`
3. Source the workspace
* `source install/setup.bash`
4. Launch ANCHOR
* `ros2 launch rover_launch.py mode:=anchor`
- https://docs.ros.org/en/humble/Installation.html ## Launching as Standalone
- Colcon For use when running independent modules through their respective computers (pi/NUC) without ANCHOR.
- `$ sudo apt update` 1. SSH to the the module's computer
* Core1: `ssh clucky@192.168.1.69`
* Core2: `ssh clucky@192.168.1.70`
* Arm: `ssh arm@192.168.1.70`
* Bio: \<TBD>
* Password: \<can be found in the rover-Docs repo under documentation>
2. Run the main node (this sends commands to the MCU)
* Navigate to the rover-ros2 workspace (location may vary)
* `cd rover-ros2`
* Source the workspace
* `source install/setup.bash`
* Start the node
* ARM: `ros2 launch rover_launch.py mode:=arm`
* CORE: `ros2 launch rover_launch.py mode:=core`
* BIO: `ros2 launch rover_launch.py mode:=bio`
- `$ sudo apt install python3-colcon-common-extensions` ## Running Headless
- Configured Static IP for Ubiquiti bullet Headless control nodes (for ARM and CORE) allow running of the module on-rover without the operator having ROS2 installed on their machine. You will need a laptop to connect to the pi/NUC in order to launch headless but it can be disconnected after the nodes are spun up.
<br>
1. SSH to the the module's computer
* Core1: `ssh clucky@192.168.1.69`
* Core2: `ssh clucky@192.168.1.70`
* Arm: `ssh arm@192.168.1.70`
* Password: \<can be found in the rover-Docs repo under documentation>
2. Run the  headless node
* You must have ANCHOR or the module's Standalone node running
* Open a new terminal (SSH'd to the module)
* Navigate to rover-ros2 workspace
* `cd rover-ros2`
* Source the workspace
* `source install/setup.bash`
* Run the node (ensure controller is connected and on x-input mode)
* CORE: `ros2 run core_pkg headless`
* ARM: `ros2 run arm_pkg headless`
- This process will depend on the system, but you'll need this for using the Ubiquiti network with the M2 bullets. ## Connecting the GuliKit Controller
- Settings:
- Net Mask: 255.255.255.0
- IP Address: 192.168.1.x
- This can be just about anything, just not ending in 1.20, 1.21, 1.69, or 1.0
- Gateway: 192.168.1.0
## Headless Control
To control the rover without a base station you'll need to run the headless control node.
- SSH to the the rover from a computer computer over wifi (or connect directly)
- `ssh clucky@192.168.1.69`
- Password: spaceiscool639
- Run `Ros2 run headless_pkg headless`
- This needs to be done in a separate ssh/shell window from the core rover node
## Running the Rover node
- Ensure you're either connected directly to the rover's network switch over ethernet or connected to an M2 bullet which has connection to the rover (red signal lights on)
- SSH to the the rover from a computer computer over wifi (or connect directly)
- `ssh clucky@192.168.1.69`
- Password: spaceiscool639
- run `ros2 run core_control_pkg core_control`
- This needs to be done in a separate ssh/shell window from the headless control node (if you're running it)
- You should see "MCU found" on a serial line
- If not, you'll get an error it's not found and the program will close. Ensure the teensy is connected via usb to the Nuc
## Connecting the GuliKit Controller (Recommended)
- Connect controller to pc with USB-C
- Select the "X-Input" control mode (Windows logo) on the controller.
- Hold the button next to the symbols (windows, android, switch, etc...)
- You'll need to release the button and press down again to cycle to the next mode
Connecting the GuliKit Controller (Recommended)
* Connect controller to pc with USB-C
* Select the "X-Input" control mode (Windows logo) on the controller.
* Hold the button next to the symbols (windows, android, switch, etc...)
* You'll need to release the button and press down again to cycle to the next mode

106
rover_launch.py Normal file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
#Prevent making __pycache__ directories
from sys import dont_write_bytecode
dont_write_bytecode = True
def launch_setup(context, *args, **kwargs):
# Retrieve the resolved value of the launch argument 'mode'
mode = LaunchConfiguration('mode').perform(context)
nodes = []
if mode == 'anchor':
# Launch every node and pass "anchor" as the parameter
nodes.append(
Node(
package='arm_pkg',
executable='arm', # change as needed
name='arm',
output='both',
parameters=[{'launch_mode': mode}]
)
)
nodes.append(
Node(
package='core_pkg',
executable='core', # change as needed
name='core',
output='both',
parameters=[{'launch_mode': mode}]
)
)
# nodes.append(
# Node(
# package='bio_pkg',
# executable='bio', # change as needed
# name='bio',
# output='both',
# parameters=[{'launch_mode': mode}]
# )
# )
nodes.append(
Node(
package='anchor_pkg',
executable='anchor', # change as needed
name='anchor',
output='both',
parameters=[{'launch_mode': mode}]
)
)
elif mode in ['arm', 'core', 'bio']:
# Only launch the node corresponding to the provided mode.
if mode == 'arm':
nodes.append(
Node(
package='arm_pkg',
executable='arm',
name='arm',
output='both',
parameters=[{'launch_mode': mode}]
)
)
elif mode == 'core':
nodes.append(
Node(
package='core_pkg',
executable='core',
name='core',
output='both',
parameters=[{'launch_mode': mode}]
)
)
# elif mode == 'bio':
# nodes.append(
# Node(
# package='bio_pkg',
# executable='bio',
# name='bio',
# output='both',
# parameters=[{'launch_mode': mode}]
# )
# )
else:
# If an invalid mode is provided, print an error.
# (You might want to raise an exception or handle it differently.)
print("Invalid mode provided. Choose one of: arm, core, bio, anchor.")
return nodes
def generate_launch_description():
declare_arg = DeclareLaunchArgument(
'mode',
default_value='anchor',
description='Launch mode: arm, core, bio, or anchor'
)
return LaunchDescription([
declare_arg,
OpaqueFunction(function=launch_setup)
])

View File

View File

@@ -0,0 +1,143 @@
import rclpy
from rclpy.node import Node
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import sys
import threading
import glob
from std_msgs.msg import String
from ros2_interfaces_pkg.msg import CoreFeedback
from ros2_interfaces_pkg.msg import CoreControl
serial_pub = None
thread = None
class SerialRelay(Node):
def __init__(self):
# Initalize node with name
super().__init__("anchor_node")#previously 'serial_publisher'
# Create publishers
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10)
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
# Create a subscriber
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.send_cmd, 10)
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(4):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
#(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU...")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self):
# This thread makes all the update processes run in the background
global thread
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
try:
while rclpy.ok():
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
def read_MCU(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
# All output over debug temporarily
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
return
except serial.SerialException as e:
print(f"SerialException: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
except TypeError as e:
print(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
self.exit(1)
def send_cmd(self, msg):
message = msg.data
#self.get_logger().info(f"Sending command to MCU: {msg}")
self.ser.write(bytes(message, "utf8"))
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
def cleanup(self):
print("Cleaning up before terminating...")
if self.ser.is_open:
self.ser.close()
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):
rclpy.init(args=args)
sys.excepthook = myexcepthook
global serial_pub
serial_pub = SerialRelay()
serial_pub.run()
if __name__ == '__main__':
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
main()

View File

@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>anchor_pkg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="tristanmcginnis26@gmail.com">tristan</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

4
src/anchor_pkg/setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/anchor_pkg
[install]
install_scripts=$base/lib/anchor_pkg

25
src/anchor_pkg/setup.py Normal file
View File

@@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'anchor_pkg'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='tristan',
maintainer_email='tristanmcginnis26@gmail.com',
description='Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN',
license='All Rights Reserved',
entry_points={
'console_scripts': [
"anchor = anchor_pkg.anchor_node:main"
],
},
)

View File

@@ -45,8 +45,7 @@ class Headless(Node):
self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10) self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
#self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change self.laser_status = 0
# Initialize pygame # Initialize pygame
pygame.init() pygame.init()
@@ -106,23 +105,69 @@ class Headless(Node):
exit() exit()
input = ArmManual() input = ArmManual()
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
input.axis1 = round(self.gamepad.get_axis(0))#left x-axis # Triggers for gripper control
input.axis2 = round(self.gamepad.get_axis(1))#left y-axis if self.gamepad.get_axis(2) > 0:#left trigger
input.axis3 = round(self.gamepad.get_axis(4))#right y-axis input.gripper = -1
elif self.gamepad.get_axis(5) > 0:#right trigger
input.gripper = 1
# Toggle Laser
if self.gamepad.get_button(7):#Start
self.laser_status = 1
elif self.gamepad.get_button(6):#Back
self.laser_status = 0
input.laser = self.laser_status
if self.gamepad.get_button(5):#right bumper, control effector
# Left stick X-axis for effector yaw
if self.gamepad.get_axis(0) > 0:
input.effector_yaw = 1
elif self.gamepad.get_axis(0) < 0:
input.effector_yaw = -1
# Right stick X-axis for effector roll
if self.gamepad.get_axis(3) > 0:
input.effector_roll = 1
elif self.gamepad.get_axis(3) < 0:
input.effector_roll = -1
else: # Control arm axis
dpad_input = self.gamepad.get_hat(0)
input.axis0 = 0
if dpad_input[0] == 1:
input.axis0 = 1
elif dpad_input[0] == -1:
input.axis0 = -1
if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
input.axis1 = -1 * round(self.gamepad.get_axis(0))
if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
input.axis2 = -1 * round(self.gamepad.get_axis(1))
if self.gamepad.get_axis(4) > .15 or self.gamepad.get_axis(4) < -.15:
input.axis3 = -1 * round(self.gamepad.get_axis(4))
# input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
# input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
# input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
#Button Mappings
#axis2 -> LT
#axis5 -> RT
#Buttons0 -> A
#Buttons1 -> B
#Buttons2 -> X
#Buttons3 -> Y
#Buttons4 -> LB
#Buttons5 -> RB
#Buttons6 -> Back
#Buttons7 -> Start
#Temporary, not controlling digit. Awaiting embedded implementation
input.effector_yaw = 0
input.effector_roll = 0
input.gripper = 0
input.linear_actuator = 0 input.linear_actuator = 0
input.laser = 0
if pygame.joystick.get_count() != 0: if pygame.joystick.get_count() != 0:

View File

@@ -17,72 +17,73 @@ thread = None
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node with name # Initialize node
super().__init__("arm_node") super().__init__("arm_node")
# Get launch mode parameter
self.declare_parameter('launch_mode', 'arm')
self.launch_mode = self.get_parameter('launch_mode').value
self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
# Create publishers # Create publishers
#Depricated, temporary for reference
# self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)
# self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)
# self.faerie_publisher = self.create_publisher(FaerieTelemetry, '/astra/arm/bio/feedback', 10)
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10) self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10) self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
# Create subscribers # Create subscribers
#Depricated, temporary for reference
#self.control_subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10)
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10) self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10) self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
#self.command_subscriber = self.create_subscription(String, '/astra/arm/command', self.send_command, 10) # Topics used in anchor mode
#self.faerie_subscriber = self.create_subscription(String, "/astra/arm/bio/control", self.send_faerie_controls, 10) if self.launch_mode == 'anchor':
self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
# Loop through all serial devices on the computer to check for the MCU
self.port = None
ports = SerialRelay.list_serial_ports()
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if b"pong" in response: if self.launch_mode == 'arm':
self.port = port # Loop through all serial devices on the computer to check for the MCU
print(f"Found MCU at {self.port}!") self.port = None
break ports = SerialRelay.list_serial_ports()
except: for i in range(4):
pass for port in ports:
if self.port is not None: try:
break # connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
if self.port is None: #print(f"Checking port {port}...")
print("Unable to find MCU... please make sure it is connected.") ser.write(b"ping\n")
time.sleep(1) response = ser.read_until("\n")
sys.exit(1)
# if pong is in response, then we are talking with the MCU
self.ser = serial.Serial(self.port, 115200) if b"pong" in response:
atexit.register(self.cleanup) self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None:
self.get_logger().info("Unable to find MCU... please make sure it is connected.")
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
#if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
if self.ser.in_waiting: if self.launch_mode == 'arm':
self.read_mcu() if self.ser.in_waiting:
else: self.read_mcu()
time.sleep(0.1) else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
@@ -94,17 +95,17 @@ class SerialRelay(Node):
try: try:
output = str(self.ser.readline(), "utf8") output = str(self.ser.readline(), "utf8")
if output: if output:
print(f"[MCU] {output}", end="") self.get_logger().info(f"[MCU] {output}")
msg = String() msg = String()
msg.data = output msg.data = output
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
except serial.SerialException: except serial.SerialException:
print("SerialException caught... closing serial port.") self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
pass pass
except TypeError as e: except TypeError as e:
print(f"TypeError: {e}") self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.") print("Closing serial port.")
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
@@ -124,10 +125,27 @@ class SerialRelay(Node):
axis1 = msg.axis1 axis1 = msg.axis1
axis2 = msg.axis2 axis2 = msg.axis2
axis3 = msg.axis3 axis3 = msg.axis3
#Send controls for arm #Send controls for arm
command = "can_relay_tovic,arm,40," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n" command = "can_relay_tovic,arm,40," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
self.ser.write(bytes(command, "utf8")) self.send_cmd(command)
print(f"[Wrote] {command}", end="")
#Send controls for end effector
command = "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
self.send_cmd(command)
command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
self.send_cmd(command)
#print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands #Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll # ef_roll = msg.effector_roll
@@ -141,110 +159,19 @@ class SerialRelay(Node):
return return
def send_cmd(self, msg):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
self.get_logger().info(f"[Arm to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
# Depricated functions, kept temporarily for reference def anchor_feedback(self, msg):
self.get_logger().info(f"[Arm Anchor] {msg.data}")
#self.send_cmd(msg.data)
# def send_controls(self, msg):
# command = ""
# ef_cmd = ""
# if(msg.b):
# command = "arm,stop\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
# if(msg.a):
# command = "arm,endEffect,act,0"#retract actuator out
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# elif(msg.x):
# command = "arm,endEffect,act,1"#extend actuator in
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.plus):
# command = "arm,endEffect,laser,1\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# elif(msg.minus):
# command = "arm,endEffect,laser,0\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.rb):
# ef_cmd = "arm,endEffect,ctrl,"
# if(msg.lt >= 0.5):
# ef_cmd += "-1,"
# elif(msg.rt >= 0.5):
# ef_cmd += "1,"
# else:
# ef_cmd += "0,"
# if(msg.rs_x < 0):
# ef_cmd += "-1,"
# elif(msg.rs_x > 0):
# ef_cmd += "1,"
# else:
# ef_cmd += "0,"
# if(msg.ls_x < 0):
# ef_cmd += "-1"
# elif(msg.ls_x > 0):
# ef_cmd += "1"
# else:
# ef_cmd += "0"
# command = ef_cmd + "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
# else:
# ef_cmd = "arm,endEffect,ctrl,"
# if(msg.lt >= 0.5):
# ef_cmd += "-1,0,0"
# elif(msg.rt >= 0.5):
# ef_cmd += "1,0,0"
# else:
# ef_cmd += "0,0,0"
# command = ef_cmd + "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# if(msg.lb):
# #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
# command = "arm,man,0.25,"
# else:
# #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
# command = "arm,man,0.15,"
# if(msg.d_left):
# command += "-1,"
# elif(msg.d_right):
# command += "1,"
# else:
# command += "0,"
# if(msg.ls_x < -0.4):
# command += "1,"
# elif(msg.ls_x > 0.4):
# command += "-1,"
# else:
# command += "0,"
# if(msg.ls_y < -0.4):
# command += "1,"
# elif(msg.ls_y > 0.4):
# command += "-1,"
# else:
# command += "0,"
# if(msg.rs_y < -0.4):
# command += "1"
# elif(msg.rs_y > 0.4):
# command += "-1"
# else:
# command += "0"
# command += "\n"
# self.ser.write(bytes(command, "utf8"))
# print(f"[Wrote] {command}", end="")
# return
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():

View File

@@ -23,46 +23,55 @@ class SerialRelay(Node):
# Initalize node with name # Initalize node with name
super().__init__("core_node")#previously 'serial_publisher' super().__init__("core_node")#previously 'serial_publisher'
# Create publishers for feedback and telemetry # Get launch mode parameter
self.declare_parameter('launch_mode', 'core')
self.launch_mode = self.get_parameter('launch_mode').value
self.get_logger().info(f"core launch_mode is: {self.launch_mode}")
# Create publishers
self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
# Create a subscriber
# Create a subscriber to listen to any commands sent for the MCU
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
# Create a service server for pinging the rover # Create a service server for pinging the rover
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
# Loop through all serial devices on the computer to check for the MCU if self.launch_mode == 'anchor':
self.port = None self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
ports = SerialRelay.list_serial_ports() self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
for i in range(2):
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response: if self.launch_mode == 'core':
self.port = port # Loop through all serial devices on the computer to check for the MCU
print(f"Found MCU at {self.port}!") self.port = None
break ports = SerialRelay.list_serial_ports()
except: for i in range(2):
pass for port in ports:
if self.port is not None: try:
break # connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
#(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = port
self.get_logger().info(f"Found MCU at {self.port}!")
break
except:
pass
if self.port is not None:
break
if self.port is None: if self.port is None:
print("Unable to find MCU...") self.get_logger().info("Unable to find MCU...")
time.sleep(1) time.sleep(1)
sys.exit(1) sys.exit(1)
self.ser = serial.Serial(self.port, 115200) self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup) atexit.register(self.cleanup)
def run(self): def run(self):
@@ -71,9 +80,11 @@ class SerialRelay(Node):
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start() thread.start()
try: try:
while rclpy.ok(): while rclpy.ok():
self.read_MCU() # Check the MCU for updates if self.launch_mode == 'core':
self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(0) sys.exit(0)
@@ -83,7 +94,7 @@ class SerialRelay(Node):
if output: if output:
# All output over debug temporarily # All output over debug temporarily
print(f"[MCU] {output}", end="") print(f"[MCU] {output}")
msg = String() msg = String()
msg.data = output msg.data = output
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
@@ -157,10 +168,24 @@ class SerialRelay(Node):
command = "can_relay_tovic,core,19," + self.scale_duty(left_stick_neg, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n' command = "can_relay_tovic,core,19," + self.scale_duty(left_stick_neg, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
#print(f"[Sys] {command}", end="") #print(f"[Sys] {command}", end="")
self.ser.write(bytes(command, "utf8"))# Send command to MCU #self.ser.write(bytes(command, "utf8"))# Send command to MCU
self.get_logger().debug(f"wrote: {command}") #self.get_logger().debug(f"wrote: {command}")
self.send_cmd(command)
#print(f"[Sys] Relaying: {command}") #print(f"[Sys] Relaying: {command}")
def send_cmd(self, msg):
if self.launch_mode == 'anchor':
#self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String()#Convert to std_msg string
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == 'core':
self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
def anchor_feedback(self, msg):
self.get_logger().info(f"[Arm Anchor] {msg}")
def ping_callback(self, request, response): def ping_callback(self, request, response):
return response return response