feat: all the features

* Move rover-ros2/rover_launch.py to src/anchor_pkg/launch/, renamed to rover.launch.py
* Anchor now waits to initialize topics until after it has found a microcontroller.
* Headless now waits for anchor to start before it starts itself
* Add default cases to motor feedback for motorId
* Added black to the flake.nix and package.xml
This commit is contained in:
David
2025-10-23 02:22:31 -05:00
parent 44aa4b0848
commit fe1ae6120f
8 changed files with 43 additions and 25 deletions

View File

@@ -43,27 +43,6 @@ class SerialRelay(Node):
# Initalize node with name
super().__init__("anchor_node")#previously 'serial_publisher'
# New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20)
self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20)
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20)
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20)
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20)
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20)
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20)
# Create publishers
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10)
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
# Create a subscriber
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10)
# Loop through all serial devices on the computer to check for the MCU
self.port = None
if port_override := os.getenv("PORT_OVERRIDE"):
@@ -98,6 +77,27 @@ class SerialRelay(Node):
self.ser.write(b"can_relay_mode,on\n")
atexit.register(self.cleanup)
# New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20)
self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20)
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20)
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20)
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20)
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20)
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20)
# Create publishers
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10)
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
# Create a subscriber
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10)
def run(self):
# This thread makes all the update processes run in the background

View File

@@ -0,0 +1,132 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
#Prevent making __pycache__ directories
from sys import dont_write_bytecode
dont_write_bytecode = True
def launch_setup(context, *args, **kwargs):
# Retrieve the resolved value of the launch argument 'mode'
mode = LaunchConfiguration('mode').perform(context)
nodes = []
if mode == 'anchor':
# Launch every node and pass "anchor" as the parameter
nodes.append(
Node(
package='arm_pkg',
executable='arm', # change as needed
name='arm',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
nodes.append(
Node(
package='core_pkg',
executable='core', # change as needed
name='core',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
nodes.append(
Node(
package='core_pkg',
executable='ptz', # change as needed
name='ptz',
output='both'
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
)
)
nodes.append(
Node(
package='bio_pkg',
executable='bio', # change as needed
name='bio',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
nodes.append(
Node(
package='anchor_pkg',
executable='anchor', # change as needed
name='anchor',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
elif mode in ['arm', 'core', 'bio', 'ptz']:
# Only launch the node corresponding to the provided mode.
if mode == 'arm':
nodes.append(
Node(
package='arm_pkg',
executable='arm',
name='arm',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
elif mode == 'core':
nodes.append(
Node(
package='core_pkg',
executable='core',
name='core',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
elif mode == 'bio':
nodes.append(
Node(
package='bio_pkg',
executable='bio',
name='bio',
output='both',
parameters=[{'launch_mode': mode}],
on_exit=Shutdown()
)
)
elif mode == 'ptz':
nodes.append(
Node(
package='core_pkg',
executable='ptz',
name='ptz',
output='both',
on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched
)
)
else:
# If an invalid mode is provided, print an error.
print("Invalid mode provided. Choose one of: arm, core, bio, anchor, ptz.")
return nodes
def generate_launch_description():
declare_arg = DeclareLaunchArgument(
'mode',
default_value='anchor',
description='Launch mode: arm, core, bio, anchor, or ptz'
)
return LaunchDescription([
declare_arg,
OpaqueFunction(function=launch_setup)
])

View File

@@ -11,6 +11,8 @@
<depend>common_interfaces</depend>
<depend>python3-serial</depend>
<build_depend>black</build_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>

View File

@@ -1,4 +1,6 @@
from setuptools import find_packages, setup
from os import path
from glob import glob
package_name = 'anchor_pkg'
@@ -9,7 +11,8 @@ setup(
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(path.join("share", package_name), ['package.xml']),
(path.join("share", package_name, "launch"), glob("launch/*"))
],
install_requires=['setuptools'],
zip_safe=True,