diff --git a/.gitignore b/.gitignore
index 69b791c..960675a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,6 @@ log/
#VSCode settings
.vscode/
+
+#Pycache folder
+__pycache__/
\ No newline at end of file
diff --git a/README.md b/README.md
index 6925ce2..fceca4f 100644
--- a/README.md
+++ b/README.md
@@ -1,83 +1,87 @@
# rover-ros2
-Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation. You can use launch files located here to run any module as standalone or as a full-rover relay.
+
+Submodule which includes all ros2 packages for the rover. These are centrally located for modular rover operation.
+
+You will use this package to launch any module-side ROS2 nodes.
+
## Software Pre-reqs
+An acting base station computer will need several things:
-The BaseStation computer will need several things.
+* ROS2 Humble
+ * Follow the standard ROS2 humble install process. Linux recommended.
+ * https://docs.ros.org/en/humble/Installation.html
+* Colcon
+ * `$ sudo apt update`
+ * `$ sudo apt install python3-colcon-common-extensions`
+* Configured Static IP for Ubiquiti bullet (Process varies by OS)
+ * IP Address: 192.168.1.x
+ * This can be just about anything not already in use. I recommend something 30-39
+ * Net Mask: 255.255.255.0
+ * Gateway: 192.168.1.0
-- ROS2 Humble
+## Launching with ANCHOR
-- Follow the standard ROS2 humble install process. Linux recommended.
+ANCHOR (Active Node Controller Hub and Operational Relay)
+Allows for launching all nodes on the rover simulataneously. Additionally, all controls will run through the core's NUC and MCU.
+
+1. SSH to core
+ * Core1: `ssh clucky@192.168.1.69`
+ * Core2: `ssh clucky@192.168.1.70`
+ * Password: \
+2. Navigate to rover-ros2 workspace
+ * `cd rover-ros2`
+3. Source the workspace
+ * `source install/setup.bash`
+4. Launch ANCHOR
+ * `ros2 launch rover_launch.py mode:=anchor`
-- https://docs.ros.org/en/humble/Installation.html
+## Launching as Standalone
-- Colcon
+For use when running independent modules through their respective computers (pi/NUC) without ANCHOR.
-- `$ sudo apt update`
+1. SSH to the the module's computer
+ * Core1: `ssh clucky@192.168.1.69`
+ * Core2: `ssh clucky@192.168.1.70`
+ * Arm: `ssh arm@192.168.1.70`
+ * Bio: \
+ * Password: \
+2. Run the main node (this sends commands to the MCU)
+ * Navigate to the rover-ros2 workspace (location may vary)
+ * `cd rover-ros2`
+ * Source the workspace
+ * `source install/setup.bash`
+ * Start the node
+ * ARM: `ros2 launch rover_launch.py mode:=arm`
+ * CORE: `ros2 launch rover_launch.py mode:=core`
+ * BIO: `ros2 launch rover_launch.py mode:=bio`
-- `$ sudo apt install python3-colcon-common-extensions`
+## Running Headless
-- Configured Static IP for Ubiquiti bullet
+Headless control nodes (for ARM and CORE) allow running of the module on-rover without the operator having ROS2 installed on their machine. You will need a laptop to connect to the pi/NUC in order to launch headless but it can be disconnected after the nodes are spun up.
+
+1. SSH to the the module's computer
+ * Core1: `ssh clucky@192.168.1.69`
+ * Core2: `ssh clucky@192.168.1.70`
+ * Arm: `ssh arm@192.168.1.70`
+ * Password: \
+2. Run the headless node
+ * You must have ANCHOR or the module's Standalone node running
+ * Open a new terminal (SSH'd to the module)
+ * Navigate to rover-ros2 workspace
+ * `cd rover-ros2`
+ * Source the workspace
+ * `source install/setup.bash`
+ * Run the node (ensure controller is connected and on x-input mode)
+ * CORE: `ros2 run core_pkg headless`
+ * ARM: `ros2 run arm_pkg headless`
-- This process will depend on the system, but you'll need this for using the Ubiquiti network with the M2 bullets.
-
-- Settings:
-
-- Net Mask: 255.255.255.0
-
-- IP Address: 192.168.1.x
-
-- This can be just about anything, just not ending in 1.20, 1.21, 1.69, or 1.0
-
-- Gateway: 192.168.1.0
-
-
-## Headless Control
-
- To control the rover without a base station you'll need to run the headless control node.
-
-- SSH to the the rover from a computer computer over wifi (or connect directly)
-
-- `ssh clucky@192.168.1.69`
-
-- Password: spaceiscool639
-
-- Run `Ros2 run headless_pkg headless`
- - This needs to be done in a separate ssh/shell window from the core rover node
-
-
-
-## Running the Rover node
-
-
-- Ensure you're either connected directly to the rover's network switch over ethernet or connected to an M2 bullet which has connection to the rover (red signal lights on)
-
-- SSH to the the rover from a computer computer over wifi (or connect directly)
-
-- `ssh clucky@192.168.1.69`
-
-- Password: spaceiscool639
-
-- run `ros2 run core_control_pkg core_control`
- - This needs to be done in a separate ssh/shell window from the headless control node (if you're running it)
-
-- You should see "MCU found" on a serial line
-
-- If not, you'll get an error it's not found and the program will close. Ensure the teensy is connected via usb to the Nuc
-
-
-
-## Connecting the GuliKit Controller (Recommended)
-
-
-
-- Connect controller to pc with USB-C
-
-- Select the "X-Input" control mode (Windows logo) on the controller.
-
-- Hold the button next to the symbols (windows, android, switch, etc...)
-
-- You'll need to release the button and press down again to cycle to the next mode
+## Connecting the GuliKit Controller
+Connecting the GuliKit Controller (Recommended)
+* Connect controller to pc with USB-C
+* Select the "X-Input" control mode (Windows logo) on the controller.
+* Hold the button next to the symbols (windows, android, switch, etc...)
+* You'll need to release the button and press down again to cycle to the next mode
diff --git a/rover_launch.py b/rover_launch.py
new file mode 100644
index 0000000..e34a34d
--- /dev/null
+++ b/rover_launch.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python3
+
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument, OpaqueFunction
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+
+#Prevent making __pycache__ directories
+from sys import dont_write_bytecode
+dont_write_bytecode = True
+
+def launch_setup(context, *args, **kwargs):
+ # Retrieve the resolved value of the launch argument 'mode'
+ mode = LaunchConfiguration('mode').perform(context)
+ nodes = []
+
+ if mode == 'anchor':
+ # Launch every node and pass "anchor" as the parameter
+ nodes.append(
+ Node(
+ package='arm_pkg',
+ executable='arm', # change as needed
+ name='arm',
+ output='both',
+ parameters=[{'launch_mode': mode}]
+ )
+ )
+ nodes.append(
+ Node(
+ package='core_pkg',
+ executable='core', # change as needed
+ name='core',
+ output='both',
+ parameters=[{'launch_mode': mode}]
+ )
+ )
+ # nodes.append(
+ # Node(
+ # package='bio_pkg',
+ # executable='bio', # change as needed
+ # name='bio',
+ # output='both',
+ # parameters=[{'launch_mode': mode}]
+ # )
+ # )
+ nodes.append(
+ Node(
+ package='anchor_pkg',
+ executable='anchor', # change as needed
+ name='anchor',
+ output='both',
+ parameters=[{'launch_mode': mode}]
+ )
+ )
+ elif mode in ['arm', 'core', 'bio']:
+ # Only launch the node corresponding to the provided mode.
+ if mode == 'arm':
+ nodes.append(
+ Node(
+ package='arm_pkg',
+ executable='arm',
+ name='arm',
+ output='both',
+ parameters=[{'launch_mode': mode}]
+ )
+ )
+ elif mode == 'core':
+ nodes.append(
+ Node(
+ package='core_pkg',
+ executable='core',
+ name='core',
+ output='both',
+ parameters=[{'launch_mode': mode}]
+ )
+ )
+ # elif mode == 'bio':
+ # nodes.append(
+ # Node(
+ # package='bio_pkg',
+ # executable='bio',
+ # name='bio',
+ # output='both',
+ # parameters=[{'launch_mode': mode}]
+ # )
+ # )
+ else:
+ # If an invalid mode is provided, print an error.
+ # (You might want to raise an exception or handle it differently.)
+ print("Invalid mode provided. Choose one of: arm, core, bio, anchor.")
+
+ return nodes
+
+def generate_launch_description():
+ declare_arg = DeclareLaunchArgument(
+ 'mode',
+ default_value='anchor',
+ description='Launch mode: arm, core, bio, or anchor'
+ )
+
+ return LaunchDescription([
+ declare_arg,
+ OpaqueFunction(function=launch_setup)
+ ])
diff --git a/src/anchor_pkg/anchor_pkg/__init__.py b/src/anchor_pkg/anchor_pkg/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/anchor_pkg/anchor_pkg/anchor_node.py b/src/anchor_pkg/anchor_pkg/anchor_node.py
new file mode 100644
index 0000000..859fff0
--- /dev/null
+++ b/src/anchor_pkg/anchor_pkg/anchor_node.py
@@ -0,0 +1,143 @@
+import rclpy
+from rclpy.node import Node
+from std_srvs.srv import Empty
+
+import signal
+import time
+import atexit
+
+import serial
+import sys
+import threading
+import glob
+
+from std_msgs.msg import String
+from ros2_interfaces_pkg.msg import CoreFeedback
+from ros2_interfaces_pkg.msg import CoreControl
+
+serial_pub = None
+thread = None
+
+class SerialRelay(Node):
+ def __init__(self):
+ # Initalize node with name
+ super().__init__("anchor_node")#previously 'serial_publisher'
+
+
+ # Create publishers
+ self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
+ self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10)
+ self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
+
+ self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
+
+ # Create a subscriber
+ self.relay_sub = self.create_subscription(String, '/anchor/relay', self.send_cmd, 10)
+
+ # Loop through all serial devices on the computer to check for the MCU
+ self.port = None
+ ports = SerialRelay.list_serial_ports()
+ for i in range(4):
+ for port in ports:
+ try:
+ # connect and send a ping command
+ ser = serial.Serial(port, 115200, timeout=1)
+ #(f"Checking port {port}...")
+ ser.write(b"ping\n")
+ response = ser.read_until("\n")
+
+ # if pong is in response, then we are talking with the MCU
+ if b"pong" in response:
+ self.port = port
+ self.get_logger().info(f"Found MCU at {self.port}!")
+ break
+ except:
+ pass
+ if self.port is not None:
+ break
+
+ if self.port is None:
+ self.get_logger().info("Unable to find MCU...")
+ time.sleep(1)
+ sys.exit(1)
+
+ self.ser = serial.Serial(self.port, 115200)
+ atexit.register(self.cleanup)
+
+
+ def run(self):
+ # This thread makes all the update processes run in the background
+ global thread
+ thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
+ thread.start()
+
+ try:
+ while rclpy.ok():
+ self.read_MCU() # Check the MCU for updates
+ except KeyboardInterrupt:
+ sys.exit(0)
+
+ def read_MCU(self):
+ try:
+ output = str(self.ser.readline(), "utf8")
+
+ if output:
+ # All output over debug temporarily
+ self.get_logger().info(f"[MCU] {output}")
+ msg = String()
+ msg.data = output
+ self.debug_pub.publish(msg)
+ return
+ except serial.SerialException as e:
+ print(f"SerialException: {e}")
+ print("Closing serial port.")
+ if self.ser.is_open:
+ self.ser.close()
+ self.exit(1)
+ except TypeError as e:
+ print(f"TypeError: {e}")
+ print("Closing serial port.")
+ if self.ser.is_open:
+ self.ser.close()
+ self.exit(1)
+ except Exception as e:
+ print(f"Exception: {e}")
+ print("Closing serial port.")
+ if self.ser.is_open:
+ self.ser.close()
+ self.exit(1)
+
+ def send_cmd(self, msg):
+ message = msg.data
+ #self.get_logger().info(f"Sending command to MCU: {msg}")
+ self.ser.write(bytes(message, "utf8"))
+
+ @staticmethod
+ def list_serial_ports():
+ return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
+
+ def cleanup(self):
+ print("Cleaning up before terminating...")
+ if self.ser.is_open:
+ self.ser.close()
+
+def myexcepthook(type, value, tb):
+ print("Uncaught exception:", type, value)
+ if serial_pub:
+ serial_pub.cleanup()
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ sys.excepthook = myexcepthook
+
+ global serial_pub
+
+ serial_pub = SerialRelay()
+ serial_pub.run()
+
+if __name__ == '__main__':
+ signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
+ signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
+ main()
+
diff --git a/src/anchor_pkg/package.xml b/src/anchor_pkg/package.xml
new file mode 100644
index 0000000..8366b03
--- /dev/null
+++ b/src/anchor_pkg/package.xml
@@ -0,0 +1,20 @@
+
+
+
+ anchor_pkg
+ 0.0.0
+ TODO: Package description
+ tristan
+ TODO: License declaration
+
+ rclpy
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/src/anchor_pkg/resource/anchor_pkg b/src/anchor_pkg/resource/anchor_pkg
new file mode 100644
index 0000000..e69de29
diff --git a/src/anchor_pkg/setup.cfg b/src/anchor_pkg/setup.cfg
new file mode 100644
index 0000000..9a77caa
--- /dev/null
+++ b/src/anchor_pkg/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/anchor_pkg
+[install]
+install_scripts=$base/lib/anchor_pkg
diff --git a/src/anchor_pkg/setup.py b/src/anchor_pkg/setup.py
new file mode 100644
index 0000000..e2844e7
--- /dev/null
+++ b/src/anchor_pkg/setup.py
@@ -0,0 +1,25 @@
+from setuptools import find_packages, setup
+
+package_name = 'anchor_pkg'
+
+setup(
+ name=package_name,
+ version='0.0.0',
+ packages=find_packages(exclude=['test']),
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ ('share/' + package_name, ['package.xml']),
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='tristan',
+ maintainer_email='tristanmcginnis26@gmail.com',
+ description='Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN',
+ license='All Rights Reserved',
+ entry_points={
+ 'console_scripts': [
+ "anchor = anchor_pkg.anchor_node:main"
+ ],
+ },
+)
diff --git a/src/arm_pkg/arm_pkg/arm_headless.py b/src/arm_pkg/arm_pkg/arm_headless.py
index 761cbe3..cbe0108 100755
--- a/src/arm_pkg/arm_pkg/arm_headless.py
+++ b/src/arm_pkg/arm_pkg/arm_headless.py
@@ -45,8 +45,7 @@ class Headless(Node):
self.debug_sub = self.create_subscription(String, '/arm/feedback/debug', self.read_feedback, 10)
- #self.lastMsg = String() #Used to ignore sending controls repeatedly when they do not change
-
+ self.laser_status = 0
# Initialize pygame
pygame.init()
@@ -106,23 +105,69 @@ class Headless(Node):
exit()
input = ArmManual()
- dpad_input = self.gamepad.get_hat(0)
- input.axis0 = 0
- if dpad_input[0] == 1:
- input.axis0 = 1
- elif dpad_input[0] == -1:
- input.axis0 = -1
- input.axis1 = round(self.gamepad.get_axis(0))#left x-axis
- input.axis2 = round(self.gamepad.get_axis(1))#left y-axis
- input.axis3 = round(self.gamepad.get_axis(4))#right y-axis
+ # Triggers for gripper control
+ if self.gamepad.get_axis(2) > 0:#left trigger
+ input.gripper = -1
+ elif self.gamepad.get_axis(5) > 0:#right trigger
+ input.gripper = 1
+
+ # Toggle Laser
+ if self.gamepad.get_button(7):#Start
+ self.laser_status = 1
+ elif self.gamepad.get_button(6):#Back
+ self.laser_status = 0
+ input.laser = self.laser_status
+
+ if self.gamepad.get_button(5):#right bumper, control effector
+
+ # Left stick X-axis for effector yaw
+ if self.gamepad.get_axis(0) > 0:
+ input.effector_yaw = 1
+ elif self.gamepad.get_axis(0) < 0:
+ input.effector_yaw = -1
+
+ # Right stick X-axis for effector roll
+ if self.gamepad.get_axis(3) > 0:
+ input.effector_roll = 1
+ elif self.gamepad.get_axis(3) < 0:
+ input.effector_roll = -1
+
+ else: # Control arm axis
+ dpad_input = self.gamepad.get_hat(0)
+ input.axis0 = 0
+ if dpad_input[0] == 1:
+ input.axis0 = 1
+ elif dpad_input[0] == -1:
+ input.axis0 = -1
+
+ if self.gamepad.get_axis(0) > .15 or self.gamepad.get_axis(0) < -.15:
+ input.axis1 = -1 * round(self.gamepad.get_axis(0))
+
+ if self.gamepad.get_axis(1) > .15 or self.gamepad.get_axis(1) < -.15:
+ input.axis2 = -1 * round(self.gamepad.get_axis(1))
+
+ if self.gamepad.get_axis(4) > .15 or self.gamepad.get_axis(4) < -.15:
+ input.axis3 = -1 * round(self.gamepad.get_axis(4))
+
+ # input.axis1 = -1 * round(self.gamepad.get_axis(0))#left x-axis
+ # input.axis2 = -1 * round(self.gamepad.get_axis(1))#left y-axis
+ # input.axis3 = -1 * round(self.gamepad.get_axis(4))#right y-axis
+
+
+ #Button Mappings
+ #axis2 -> LT
+ #axis5 -> RT
+ #Buttons0 -> A
+ #Buttons1 -> B
+ #Buttons2 -> X
+ #Buttons3 -> Y
+ #Buttons4 -> LB
+ #Buttons5 -> RB
+ #Buttons6 -> Back
+ #Buttons7 -> Start
- #Temporary, not controlling digit. Awaiting embedded implementation
- input.effector_yaw = 0
- input.effector_roll = 0
- input.gripper = 0
input.linear_actuator = 0
- input.laser = 0
if pygame.joystick.get_count() != 0:
diff --git a/src/arm_pkg/arm_pkg/arm_node.py b/src/arm_pkg/arm_pkg/arm_node.py
index 45d60e2..63c5a57 100644
--- a/src/arm_pkg/arm_pkg/arm_node.py
+++ b/src/arm_pkg/arm_pkg/arm_node.py
@@ -17,72 +17,73 @@ thread = None
class SerialRelay(Node):
def __init__(self):
- # Initialize node with name
+ # Initialize node
super().__init__("arm_node")
+ # Get launch mode parameter
+ self.declare_parameter('launch_mode', 'arm')
+ self.launch_mode = self.get_parameter('launch_mode').value
+ self.get_logger().info(f"arm launch_mode is: {self.launch_mode}")
+
# Create publishers
-
- #Depricated, temporary for reference
- # self.output_publisher = self.create_publisher(String, '/astra/arm/feedback', 10)
- # self.state_publisher = self.create_publisher(ArmState, '/astra/arm/state', 10)
- # self.faerie_publisher = self.create_publisher(FaerieTelemetry, '/astra/arm/bio/feedback', 10)
-
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
-
-
# Create subscribers
-
- #Depricated, temporary for reference
- #self.control_subscriber = self.create_subscription(ControllerState, '/astra/arm/control', self.send_controls, 10)
-
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
- #self.command_subscriber = self.create_subscription(String, '/astra/arm/command', self.send_command, 10)
- #self.faerie_subscriber = self.create_subscription(String, "/astra/arm/bio/control", self.send_faerie_controls, 10)
+ # Topics used in anchor mode
+ if self.launch_mode == 'anchor':
+ self.anchor_sub = self.create_subscription(String, '/anchor/arm/feedback', self.anchor_feedback, 10)
+ self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
- # Loop through all serial devices on the computer to check for the MCU
- self.port = None
- ports = SerialRelay.list_serial_ports()
- for i in range(2):
- for port in ports:
- try:
- # connect and send a ping command
- ser = serial.Serial(port, 115200, timeout=1)
- print(f"Checking port {port}...")
- ser.write(b"ping\n")
- response = ser.read_until("\n")
- # if pong is in response, then we are talking with the MCU
- if b"pong" in response:
- self.port = port
- print(f"Found MCU at {self.port}!")
- break
- except:
- pass
- if self.port is not None:
- break
-
- if self.port is None:
- print("Unable to find MCU... please make sure it is connected.")
- time.sleep(1)
- sys.exit(1)
-
- self.ser = serial.Serial(self.port, 115200)
- atexit.register(self.cleanup)
+ # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
+ if self.launch_mode == 'arm':
+ # Loop through all serial devices on the computer to check for the MCU
+ self.port = None
+ ports = SerialRelay.list_serial_ports()
+ for i in range(4):
+ for port in ports:
+ try:
+ # connect and send a ping command
+ ser = serial.Serial(port, 115200, timeout=1)
+ #print(f"Checking port {port}...")
+ ser.write(b"ping\n")
+ response = ser.read_until("\n")
+
+ # if pong is in response, then we are talking with the MCU
+ if b"pong" in response:
+ self.port = port
+ self.get_logger().info(f"Found MCU at {self.port}!")
+ break
+ except:
+ pass
+ if self.port is not None:
+ break
+
+ if self.port is None:
+ self.get_logger().info("Unable to find MCU... please make sure it is connected.")
+ time.sleep(1)
+ sys.exit(1)
+
+ self.ser = serial.Serial(self.port, 115200)
+ atexit.register(self.cleanup)
def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
+ #if in arm mode, will need to read from the MCU
+
try:
while rclpy.ok():
- if self.ser.in_waiting:
- self.read_mcu()
- else:
- time.sleep(0.1)
+ if self.launch_mode == 'arm':
+ if self.ser.in_waiting:
+ self.read_mcu()
+ else:
+ time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
@@ -94,17 +95,17 @@ class SerialRelay(Node):
try:
output = str(self.ser.readline(), "utf8")
if output:
- print(f"[MCU] {output}", end="")
+ self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
- print("SerialException caught... closing serial port.")
+ self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
- print(f"TypeError: {e}")
+ self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
@@ -124,10 +125,27 @@ class SerialRelay(Node):
axis1 = msg.axis1
axis2 = msg.axis2
axis3 = msg.axis3
+
#Send controls for arm
command = "can_relay_tovic,arm,40," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
- self.ser.write(bytes(command, "utf8"))
- print(f"[Wrote] {command}", end="")
+ self.send_cmd(command)
+
+ #Send controls for end effector
+ command = "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
+ self.send_cmd(command)
+
+ command = "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
+ self.send_cmd(command)
+
+ command = "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
+ self.send_cmd(command)
+
+ command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
+ self.send_cmd(command)
+
+
+
+ #print(f"[Wrote] {command}", end="")
#Not yet finished, needs embedded implementation for new commands
# ef_roll = msg.effector_roll
@@ -141,110 +159,19 @@ class SerialRelay(Node):
return
+ def send_cmd(self, msg):
+ if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
+ output = String()
+ output.data = msg
+ self.anchor_pub.publish(output)
+ elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
+ self.get_logger().info(f"[Arm to MCU] {msg}")
+ self.ser.write(bytes(msg, "utf8"))
- # Depricated functions, kept temporarily for reference
+ def anchor_feedback(self, msg):
+ self.get_logger().info(f"[Arm Anchor] {msg.data}")
+ #self.send_cmd(msg.data)
- # def send_controls(self, msg):
- # command = ""
- # ef_cmd = ""
- # if(msg.b):
- # command = "arm,stop\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
- # return
-
- # if(msg.a):
- # command = "arm,endEffect,act,0"#retract actuator out
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
- # elif(msg.x):
- # command = "arm,endEffect,act,1"#extend actuator in
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
-
- # if(msg.plus):
- # command = "arm,endEffect,laser,1\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
- # elif(msg.minus):
- # command = "arm,endEffect,laser,0\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
-
- # if(msg.rb):
- # ef_cmd = "arm,endEffect,ctrl,"
- # if(msg.lt >= 0.5):
- # ef_cmd += "-1,"
- # elif(msg.rt >= 0.5):
- # ef_cmd += "1,"
- # else:
- # ef_cmd += "0,"
-
- # if(msg.rs_x < 0):
- # ef_cmd += "-1,"
- # elif(msg.rs_x > 0):
- # ef_cmd += "1,"
- # else:
- # ef_cmd += "0,"
- # if(msg.ls_x < 0):
- # ef_cmd += "-1"
- # elif(msg.ls_x > 0):
- # ef_cmd += "1"
- # else:
- # ef_cmd += "0"
-
- # command = ef_cmd + "\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
- # return
- # else:
- # ef_cmd = "arm,endEffect,ctrl,"
- # if(msg.lt >= 0.5):
- # ef_cmd += "-1,0,0"
- # elif(msg.rt >= 0.5):
- # ef_cmd += "1,0,0"
- # else:
- # ef_cmd += "0,0,0"
- # command = ef_cmd + "\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
-
- # if(msg.lb):
- # #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
- # command = "arm,man,0.25,"
- # else:
- # #self.ser.write(bytes("arm,setMode,manual\n", "utf8"))
- # command = "arm,man,0.15,"
-
- # if(msg.d_left):
- # command += "-1,"
- # elif(msg.d_right):
- # command += "1,"
- # else:
- # command += "0,"
- # if(msg.ls_x < -0.4):
- # command += "1,"
- # elif(msg.ls_x > 0.4):
- # command += "-1,"
- # else:
- # command += "0,"
- # if(msg.ls_y < -0.4):
- # command += "1,"
- # elif(msg.ls_y > 0.4):
- # command += "-1,"
- # else:
- # command += "0,"
- # if(msg.rs_y < -0.4):
- # command += "1"
- # elif(msg.rs_y > 0.4):
- # command += "-1"
- # else:
- # command += "0"
-
- # command += "\n"
- # self.ser.write(bytes(command, "utf8"))
- # print(f"[Wrote] {command}", end="")
- # return
@staticmethod
def list_serial_ports():
diff --git a/src/core_pkg/core_pkg/core_node.py b/src/core_pkg/core_pkg/core_node.py
index 34e0ba4..8b4aeff 100644
--- a/src/core_pkg/core_pkg/core_node.py
+++ b/src/core_pkg/core_pkg/core_node.py
@@ -23,46 +23,55 @@ class SerialRelay(Node):
# Initalize node with name
super().__init__("core_node")#previously 'serial_publisher'
- # Create publishers for feedback and telemetry
+ # Get launch mode parameter
+ self.declare_parameter('launch_mode', 'core')
+ self.launch_mode = self.get_parameter('launch_mode').value
+ self.get_logger().info(f"core launch_mode is: {self.launch_mode}")
+
+ # Create publishers
self.debug_pub = self.create_publisher(String, '/core/debug', 10)
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
-
- # Create a subscriber to listen to any commands sent for the MCU
+ # Create a subscriber
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
-
# Create a service server for pinging the rover
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback)
- # Loop through all serial devices on the computer to check for the MCU
- self.port = None
- ports = SerialRelay.list_serial_ports()
- for i in range(2):
- for port in ports:
- try:
- # connect and send a ping command
- ser = serial.Serial(port, 115200, timeout=1)
- print(f"Checking port {port}...")
- ser.write(b"ping\n")
- response = ser.read_until("\n")
+ if self.launch_mode == 'anchor':
+ self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10)
+ self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
- # if pong is in response, then we are talking with the MCU
- if b"pong" in response:
- self.port = port
- print(f"Found MCU at {self.port}!")
- break
- except:
- pass
- if self.port is not None:
- break
+
+ if self.launch_mode == 'core':
+ # Loop through all serial devices on the computer to check for the MCU
+ self.port = None
+ ports = SerialRelay.list_serial_ports()
+ for i in range(2):
+ for port in ports:
+ try:
+ # connect and send a ping command
+ ser = serial.Serial(port, 115200, timeout=1)
+ #(f"Checking port {port}...")
+ ser.write(b"ping\n")
+ response = ser.read_until("\n")
+
+ # if pong is in response, then we are talking with the MCU
+ if b"pong" in response:
+ self.port = port
+ self.get_logger().info(f"Found MCU at {self.port}!")
+ break
+ except:
+ pass
+ if self.port is not None:
+ break
- if self.port is None:
- print("Unable to find MCU...")
- time.sleep(1)
- sys.exit(1)
+ if self.port is None:
+ self.get_logger().info("Unable to find MCU...")
+ time.sleep(1)
+ sys.exit(1)
- self.ser = serial.Serial(self.port, 115200)
- atexit.register(self.cleanup)
+ self.ser = serial.Serial(self.port, 115200)
+ atexit.register(self.cleanup)
def run(self):
@@ -71,9 +80,11 @@ class SerialRelay(Node):
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
thread.start()
+
try:
while rclpy.ok():
- self.read_MCU() # Check the MCU for updates
+ if self.launch_mode == 'core':
+ self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt:
sys.exit(0)
@@ -83,7 +94,7 @@ class SerialRelay(Node):
if output:
# All output over debug temporarily
- print(f"[MCU] {output}", end="")
+ print(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
@@ -157,10 +168,24 @@ class SerialRelay(Node):
command = "can_relay_tovic,core,19," + self.scale_duty(left_stick_neg, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
#print(f"[Sys] {command}", end="")
- self.ser.write(bytes(command, "utf8"))# Send command to MCU
- self.get_logger().debug(f"wrote: {command}")
+ #self.ser.write(bytes(command, "utf8"))# Send command to MCU
+ #self.get_logger().debug(f"wrote: {command}")
+
+ self.send_cmd(command)
+
#print(f"[Sys] Relaying: {command}")
+ def send_cmd(self, msg):
+ if self.launch_mode == 'anchor':
+ #self.get_logger().info(f"[Core to Anchor Relay] {msg}")
+ output = String()#Convert to std_msg string
+ output.data = msg
+ self.anchor_pub.publish(output)
+ elif self.launch_mode == 'core':
+ self.get_logger().info(f"[Core to MCU] {msg}")
+ self.ser.write(bytes(msg, "utf8"))
+ def anchor_feedback(self, msg):
+ self.get_logger().info(f"[Arm Anchor] {msg}")
def ping_callback(self, request, response):
return response