mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
reformat with black
This commit is contained in:
@@ -38,10 +38,12 @@ Subscribers:
|
||||
* /anchor/to_vic/relay_string
|
||||
- Publish raw strings to this topic to send directly to the MCU for debugging
|
||||
"""
|
||||
|
||||
|
||||
class SerialRelay(Node):
|
||||
def __init__(self):
|
||||
# Initalize node with name
|
||||
super().__init__("anchor_node")#previously 'serial_publisher'
|
||||
super().__init__("anchor_node") # previously 'serial_publisher'
|
||||
|
||||
# Loop through all serial devices on the computer to check for the MCU
|
||||
self.port = None
|
||||
@@ -55,7 +57,7 @@ class SerialRelay(Node):
|
||||
try:
|
||||
# connect and send a ping command
|
||||
ser = serial.Serial(port, 115200, timeout=1)
|
||||
#(f"Checking port {port}...")
|
||||
# (f"Checking port {port}...")
|
||||
ser.write(b"ping\n")
|
||||
response = ser.read_until(bytes("\n", "utf8"))
|
||||
|
||||
@@ -78,56 +80,74 @@ class SerialRelay(Node):
|
||||
atexit.register(self.cleanup)
|
||||
|
||||
# New pub/sub with VicCAN
|
||||
self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20)
|
||||
self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20)
|
||||
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20)
|
||||
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20)
|
||||
self.fromvic_debug_pub_ = self.create_publisher(
|
||||
String, "/anchor/from_vic/debug", 20
|
||||
)
|
||||
self.fromvic_core_pub_ = self.create_publisher(
|
||||
VicCAN, "/anchor/from_vic/core", 20
|
||||
)
|
||||
self.fromvic_arm_pub_ = self.create_publisher(
|
||||
VicCAN, "/anchor/from_vic/arm", 20
|
||||
)
|
||||
self.fromvic_bio_pub_ = self.create_publisher(
|
||||
VicCAN, "/anchor/from_vic/bio", 20
|
||||
)
|
||||
|
||||
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20)
|
||||
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20)
|
||||
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20)
|
||||
self.mock_mcu_sub_ = self.create_subscription(
|
||||
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
|
||||
)
|
||||
self.tovic_sub_ = self.create_subscription(
|
||||
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20
|
||||
)
|
||||
self.tovic_debug_sub_ = self.create_subscription(
|
||||
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20
|
||||
)
|
||||
|
||||
# Create publishers
|
||||
self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10)
|
||||
self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
|
||||
self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
|
||||
|
||||
# Create publishers
|
||||
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10)
|
||||
self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10)
|
||||
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10)
|
||||
|
||||
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10)
|
||||
|
||||
# Create a subscriber
|
||||
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10)
|
||||
self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
|
||||
|
||||
# Create a subscriber
|
||||
self.relay_sub = self.create_subscription(
|
||||
String, "/anchor/relay", self.on_relay_tovic_string, 10
|
||||
)
|
||||
|
||||
def run(self):
|
||||
# This thread makes all the update processes run in the background
|
||||
global thread
|
||||
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
|
||||
thread.start()
|
||||
|
||||
|
||||
try:
|
||||
while rclpy.ok():
|
||||
self.read_MCU() # Check the MCU for updates
|
||||
self.read_MCU() # Check the MCU for updates
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
|
||||
def read_MCU(self):
|
||||
""" Check the USB serial port for new data from the MCU, and publish string to appropriate topics """
|
||||
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
|
||||
try:
|
||||
output = str(self.ser.readline(), "utf8")
|
||||
|
||||
|
||||
if output:
|
||||
self.relay_fromvic(output)
|
||||
# All output over debug temporarily
|
||||
#self.get_logger().info(f"[MCU] {output}")
|
||||
# self.get_logger().info(f"[MCU] {output}")
|
||||
msg = String()
|
||||
msg.data = output
|
||||
self.debug_pub.publish(msg)
|
||||
if output.startswith("can_relay_fromvic,core"):
|
||||
self.core_pub.publish(msg)
|
||||
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
|
||||
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
|
||||
"can_relay_fromvic,digit"
|
||||
): # digit for voltage readings
|
||||
self.arm_pub.publish(msg)
|
||||
if output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
|
||||
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
|
||||
"can_relay_fromvic,digit"
|
||||
): # digit for SHT sensor
|
||||
self.bio_pub.publish(msg)
|
||||
# msg = String()
|
||||
# msg.data = output
|
||||
@@ -152,15 +172,13 @@ class SerialRelay(Node):
|
||||
# self.ser.close()
|
||||
# exit(1)
|
||||
|
||||
|
||||
def on_mock_fromvic(self, msg: String):
|
||||
""" For testing without an actual MCU, publish strings here as if they came from an MCU """
|
||||
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
|
||||
# self.get_logger().info(f"Got command from mock MCU: {msg}")
|
||||
self.relay_fromvic(msg.data)
|
||||
|
||||
|
||||
def on_relay_tovic_viccan(self, msg: VicCAN):
|
||||
""" Relay a VicCAN message to the MCU """
|
||||
"""Relay a VicCAN message to the MCU"""
|
||||
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
|
||||
for num in msg.data:
|
||||
output += f",{round(num, 7)}" # limit to 7 decimal places
|
||||
@@ -169,7 +187,7 @@ class SerialRelay(Node):
|
||||
self.ser.write(bytes(output, "utf8"))
|
||||
|
||||
def relay_fromvic(self, msg: str):
|
||||
""" Relay a string message from the MCU to the appropriate VicCAN topic """
|
||||
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
|
||||
self.fromvic_debug_pub_.publish(String(data=msg))
|
||||
parts = msg.strip().split(",")
|
||||
if len(parts) > 0 and parts[0] != "can_relay_fromvic":
|
||||
@@ -181,11 +199,13 @@ class SerialRelay(Node):
|
||||
malformed_reason: str = ""
|
||||
if len(parts) < 3 or len(parts) > 7:
|
||||
malformed = True
|
||||
malformed_reason = f"invalid argument count (expected [3,7], got {len(parts)})"
|
||||
malformed_reason = (
|
||||
f"invalid argument count (expected [3,7], got {len(parts)})"
|
||||
)
|
||||
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
|
||||
malformed = True
|
||||
malformed_reason = f"invalid mcu_name '{parts[1]}'"
|
||||
elif not(parts[2].isnumeric()) or int(parts[2]) < 0:
|
||||
elif not (parts[2].isnumeric()) or int(parts[2]) < 0:
|
||||
malformed = True
|
||||
malformed_reason = f"command_id '{parts[2]}' is not a non-negative integer"
|
||||
else:
|
||||
@@ -198,7 +218,9 @@ class SerialRelay(Node):
|
||||
break
|
||||
|
||||
if malformed:
|
||||
self.get_logger().warning(f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}")
|
||||
self.get_logger().warning(
|
||||
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
|
||||
)
|
||||
return
|
||||
|
||||
# Have valid VicCAN message
|
||||
@@ -208,7 +230,9 @@ class SerialRelay(Node):
|
||||
output.command_id = int(parts[2])
|
||||
if len(parts) > 3:
|
||||
output.data = [float(x) for x in parts[3:]]
|
||||
output.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic")
|
||||
output.header = Header(
|
||||
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
|
||||
)
|
||||
|
||||
# self.get_logger().info(f"Relaying from MCU: {output}")
|
||||
if output.mcu_name == "core":
|
||||
@@ -218,11 +242,10 @@ class SerialRelay(Node):
|
||||
elif output.mcu_name == "citadel" or output.mcu_name == "digit":
|
||||
self.fromvic_bio_pub_.publish(output)
|
||||
|
||||
|
||||
def on_relay_tovic_string(self, msg: String):
|
||||
""" Relay a raw string message to the MCU for debugging """
|
||||
"""Relay a raw string message to the MCU for debugging"""
|
||||
message = msg.data
|
||||
#self.get_logger().info(f"Sending command to MCU: {msg}")
|
||||
# self.get_logger().info(f"Sending command to MCU: {msg}")
|
||||
self.ser.write(bytes(message, "utf8"))
|
||||
|
||||
@staticmethod
|
||||
@@ -234,6 +257,7 @@ class SerialRelay(Node):
|
||||
if self.ser.is_open:
|
||||
self.ser.close()
|
||||
|
||||
|
||||
def myexcepthook(type, value, tb):
|
||||
print("Uncaught exception:", type, value)
|
||||
if serial_pub:
|
||||
@@ -249,7 +273,10 @@ def main(args=None):
|
||||
serial_pub = SerialRelay()
|
||||
serial_pub.run()
|
||||
|
||||
if __name__ == '__main__':
|
||||
#signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly
|
||||
|
||||
if __name__ == "__main__":
|
||||
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
signal.signal(
|
||||
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
||||
) # Catch termination signals and exit cleanly
|
||||
main()
|
||||
|
||||
@@ -2,115 +2,121 @@
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
|
||||
from launch.substitutions import LaunchConfiguration, ThisLaunchFileDir, PathJoinSubstitution
|
||||
from launch.substitutions import (
|
||||
LaunchConfiguration,
|
||||
ThisLaunchFileDir,
|
||||
PathJoinSubstitution,
|
||||
)
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
#Prevent making __pycache__ directories
|
||||
# Prevent making __pycache__ directories
|
||||
from sys import dont_write_bytecode
|
||||
|
||||
dont_write_bytecode = True
|
||||
|
||||
|
||||
def launch_setup(context, *args, **kwargs):
|
||||
# Retrieve the resolved value of the launch argument 'mode'
|
||||
mode = LaunchConfiguration('mode').perform(context)
|
||||
mode = LaunchConfiguration("mode").perform(context)
|
||||
nodes = []
|
||||
|
||||
if mode == 'anchor':
|
||||
if mode == "anchor":
|
||||
# Launch every node and pass "anchor" as the parameter
|
||||
|
||||
nodes.append(
|
||||
Node(
|
||||
package='arm_pkg',
|
||||
executable='arm', # change as needed
|
||||
name='arm',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="arm_pkg",
|
||||
executable="arm", # change as needed
|
||||
name="arm",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
nodes.append(
|
||||
Node(
|
||||
package='core_pkg',
|
||||
executable='core', # change as needed
|
||||
name='core',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="core_pkg",
|
||||
executable="core", # change as needed
|
||||
name="core",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
nodes.append(
|
||||
Node(
|
||||
package='core_pkg',
|
||||
executable='ptz', # change as needed
|
||||
name='ptz',
|
||||
output='both'
|
||||
package="core_pkg",
|
||||
executable="ptz", # change as needed
|
||||
name="ptz",
|
||||
output="both",
|
||||
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
|
||||
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
|
||||
)
|
||||
)
|
||||
nodes.append(
|
||||
Node(
|
||||
package='bio_pkg',
|
||||
executable='bio', # change as needed
|
||||
name='bio',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="bio_pkg",
|
||||
executable="bio", # change as needed
|
||||
name="bio",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
nodes.append(
|
||||
Node(
|
||||
package='anchor_pkg',
|
||||
executable='anchor', # change as needed
|
||||
name='anchor',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="anchor_pkg",
|
||||
executable="anchor", # change as needed
|
||||
name="anchor",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
elif mode in ['arm', 'core', 'bio', 'ptz']:
|
||||
elif mode in ["arm", "core", "bio", "ptz"]:
|
||||
# Only launch the node corresponding to the provided mode.
|
||||
if mode == 'arm':
|
||||
if mode == "arm":
|
||||
nodes.append(
|
||||
Node(
|
||||
package='arm_pkg',
|
||||
executable='arm',
|
||||
name='arm',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="arm_pkg",
|
||||
executable="arm",
|
||||
name="arm",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
elif mode == 'core':
|
||||
elif mode == "core":
|
||||
nodes.append(
|
||||
Node(
|
||||
package='core_pkg',
|
||||
executable='core',
|
||||
name='core',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="core_pkg",
|
||||
executable="core",
|
||||
name="core",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
elif mode == 'bio':
|
||||
elif mode == "bio":
|
||||
nodes.append(
|
||||
Node(
|
||||
package='bio_pkg',
|
||||
executable='bio',
|
||||
name='bio',
|
||||
output='both',
|
||||
parameters=[{'launch_mode': mode}],
|
||||
on_exit=Shutdown()
|
||||
package="bio_pkg",
|
||||
executable="bio",
|
||||
name="bio",
|
||||
output="both",
|
||||
parameters=[{"launch_mode": mode}],
|
||||
on_exit=Shutdown(),
|
||||
)
|
||||
)
|
||||
elif mode == 'ptz':
|
||||
elif mode == "ptz":
|
||||
nodes.append(
|
||||
Node(
|
||||
package='core_pkg',
|
||||
executable='ptz',
|
||||
name='ptz',
|
||||
output='both',
|
||||
on_exit=Shutdown(), #on fail, shutdown if this was the only node to be launched
|
||||
package="core_pkg",
|
||||
executable="ptz",
|
||||
name="ptz",
|
||||
output="both",
|
||||
on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -119,14 +125,12 @@ def launch_setup(context, *args, **kwargs):
|
||||
|
||||
return nodes
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
declare_arg = DeclareLaunchArgument(
|
||||
'mode',
|
||||
default_value='anchor',
|
||||
description='Launch mode: arm, core, bio, anchor, or ptz'
|
||||
"mode",
|
||||
default_value="anchor",
|
||||
description="Launch mode: arm, core, bio, anchor, or ptz",
|
||||
)
|
||||
|
||||
return LaunchDescription([
|
||||
declare_arg,
|
||||
OpaqueFunction(function=launch_setup)
|
||||
])
|
||||
return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)])
|
||||
|
||||
@@ -2,27 +2,24 @@ from setuptools import find_packages, setup
|
||||
from os import path
|
||||
from glob import glob
|
||||
|
||||
package_name = 'anchor_pkg'
|
||||
package_name = "anchor_pkg"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.0.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
version="0.0.0",
|
||||
packages=find_packages(exclude=["test"]),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
(path.join("share", package_name), ['package.xml']),
|
||||
(path.join("share", package_name, "launch"), glob("launch/*"))
|
||||
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
||||
(path.join("share", package_name), ["package.xml"]),
|
||||
(path.join("share", package_name, "launch"), glob("launch/*")),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer='tristan',
|
||||
maintainer_email='tristanmcginnis26@gmail.com',
|
||||
description='Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN',
|
||||
license='All Rights Reserved',
|
||||
maintainer="tristan",
|
||||
maintainer_email="tristanmcginnis26@gmail.com",
|
||||
description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN",
|
||||
license="All Rights Reserved",
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
"anchor = anchor_pkg.anchor_node:main"
|
||||
],
|
||||
"console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user