Merge pull request #19 from SHC-ASTRA/black

reformat with black
This commit is contained in:
David Sharpe
2025-11-07 00:14:35 -06:00
committed by GitHub
16 changed files with 689 additions and 522 deletions

6
flake.lock generated
View File

@@ -24,11 +24,11 @@
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": { "locked": {
"lastModified": 1758094726, "lastModified": 1761810010,
"narHash": "sha256-agLnClczRtYY+kQFh5dv4wGNhtFNKK7KFOmypDhsWCs=", "narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"owner": "lopsided98", "owner": "lopsided98",
"repo": "nix-ros-overlay", "repo": "nix-ros-overlay",
"rev": "9d0557032aadb65df065b1972a632572b57234b5", "rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -38,6 +38,8 @@ Subscribers:
* /anchor/to_vic/relay_string * /anchor/to_vic/relay_string
- Publish raw strings to this topic to send directly to the MCU for debugging - Publish raw strings to this topic to send directly to the MCU for debugging
""" """
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initalize node with name # Initalize node with name
@@ -78,26 +80,40 @@ class SerialRelay(Node):
atexit.register(self.cleanup) atexit.register(self.cleanup)
# New pub/sub with VicCAN # New pub/sub with VicCAN
self.fromvic_debug_pub_ = self.create_publisher(String, '/anchor/from_vic/debug', 20) self.fromvic_debug_pub_ = self.create_publisher(
self.fromvic_core_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/core', 20) String, "/anchor/from_vic/debug", 20
self.fromvic_arm_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/arm', 20) )
self.fromvic_bio_pub_ = self.create_publisher(VicCAN, '/anchor/from_vic/bio', 20) self.fromvic_core_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/core", 20
self.mock_mcu_sub_ = self.create_subscription(String, '/anchor/from_vic/mock_mcu', self.on_mock_fromvic, 20) )
self.tovic_sub_ = self.create_subscription(VicCAN, '/anchor/to_vic/relay', self.on_relay_tovic_viccan, 20) self.fromvic_arm_pub_ = self.create_publisher(
self.tovic_debug_sub_ = self.create_subscription(String, '/anchor/to_vic/relay_string', self.on_relay_tovic_string, 20) VicCAN, "/anchor/from_vic/arm", 20
)
self.fromvic_bio_pub_ = self.create_publisher(
VicCAN, "/anchor/from_vic/bio", 20
)
self.mock_mcu_sub_ = self.create_subscription(
String, "/anchor/from_vic/mock_mcu", self.on_mock_fromvic, 20
)
self.tovic_sub_ = self.create_subscription(
VicCAN, "/anchor/to_vic/relay", self.on_relay_tovic_viccan, 20
)
self.tovic_debug_sub_ = self.create_subscription(
String, "/anchor/to_vic/relay_string", self.on_relay_tovic_string, 20
)
# Create publishers # Create publishers
self.arm_pub = self.create_publisher(String, '/anchor/arm/feedback', 10) self.arm_pub = self.create_publisher(String, "/anchor/arm/feedback", 10)
self.core_pub = self.create_publisher(String, '/anchor/core/feedback', 10) self.core_pub = self.create_publisher(String, "/anchor/core/feedback", 10)
self.bio_pub = self.create_publisher(String, '/anchor/bio/feedback', 10) self.bio_pub = self.create_publisher(String, "/anchor/bio/feedback", 10)
self.debug_pub = self.create_publisher(String, '/anchor/debug', 10) self.debug_pub = self.create_publisher(String, "/anchor/debug", 10)
# Create a subscriber # Create a subscriber
self.relay_sub = self.create_subscription(String, '/anchor/relay', self.on_relay_tovic_string, 10) self.relay_sub = self.create_subscription(
String, "/anchor/relay", self.on_relay_tovic_string, 10
)
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
@@ -125,9 +141,13 @@ class SerialRelay(Node):
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"): if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg) self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg) self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg) self.bio_pub.publish(msg)
# msg = String() # msg = String()
# msg.data = output # msg.data = output
@@ -152,13 +172,11 @@ class SerialRelay(Node):
# self.ser.close() # self.ser.close()
# exit(1) # exit(1)
def on_mock_fromvic(self, msg: String): def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU""" """For testing without an actual MCU, publish strings here as if they came from an MCU"""
# self.get_logger().info(f"Got command from mock MCU: {msg}") # self.get_logger().info(f"Got command from mock MCU: {msg}")
self.relay_fromvic(msg.data) self.relay_fromvic(msg.data)
def on_relay_tovic_viccan(self, msg: VicCAN): def on_relay_tovic_viccan(self, msg: VicCAN):
"""Relay a VicCAN message to the MCU""" """Relay a VicCAN message to the MCU"""
output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}" output: str = f"can_relay_tovic,{msg.mcu_name},{msg.command_id}"
@@ -181,7 +199,9 @@ class SerialRelay(Node):
malformed_reason: str = "" malformed_reason: str = ""
if len(parts) < 3 or len(parts) > 7: if len(parts) < 3 or len(parts) > 7:
malformed = True malformed = True
malformed_reason = f"invalid argument count (expected [3,7], got {len(parts)})" malformed_reason = (
f"invalid argument count (expected [3,7], got {len(parts)})"
)
elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]: elif parts[1] not in ["core", "arm", "digit", "citadel", "broadcast"]:
malformed = True malformed = True
malformed_reason = f"invalid mcu_name '{parts[1]}'" malformed_reason = f"invalid mcu_name '{parts[1]}'"
@@ -198,7 +218,9 @@ class SerialRelay(Node):
break break
if malformed: if malformed:
self.get_logger().warning(f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}") self.get_logger().warning(
f"Ignoring malformed from_vic message: '{msg.strip()}'; reason: {malformed_reason}"
)
return return
# Have valid VicCAN message # Have valid VicCAN message
@@ -208,7 +230,9 @@ class SerialRelay(Node):
output.command_id = int(parts[2]) output.command_id = int(parts[2])
if len(parts) > 3: if len(parts) > 3:
output.data = [float(x) for x in parts[3:]] output.data = [float(x) for x in parts[3:]]
output.header = Header(stamp=self.get_clock().now().to_msg(), frame_id="from_vic") output.header = Header(
stamp=self.get_clock().now().to_msg(), frame_id="from_vic"
)
# self.get_logger().info(f"Relaying from MCU: {output}") # self.get_logger().info(f"Relaying from MCU: {output}")
if output.mcu_name == "core": if output.mcu_name == "core":
@@ -218,7 +242,6 @@ class SerialRelay(Node):
elif output.mcu_name == "citadel" or output.mcu_name == "digit": elif output.mcu_name == "citadel" or output.mcu_name == "digit":
self.fromvic_bio_pub_.publish(output) self.fromvic_bio_pub_.publish(output)
def on_relay_tovic_string(self, msg: String): def on_relay_tovic_string(self, msg: String):
"""Relay a raw string message to the MCU for debugging""" """Relay a raw string message to the MCU for debugging"""
message = msg.data message = msg.data
@@ -234,6 +257,7 @@ class SerialRelay(Node):
if self.ser.is_open: if self.ser.is_open:
self.ser.close() self.ser.close()
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub: if serial_pub:
@@ -249,7 +273,10 @@ def main(args=None):
serial_pub = SerialRelay() serial_pub = SerialRelay()
serial_pub.run() serial_pub.run()
if __name__ == '__main__':
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -2,114 +2,120 @@
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown
from launch.substitutions import LaunchConfiguration, ThisLaunchFileDir, PathJoinSubstitution from launch.substitutions import (
LaunchConfiguration,
ThisLaunchFileDir,
PathJoinSubstitution,
)
from launch_ros.actions import Node from launch_ros.actions import Node
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
from sys import dont_write_bytecode from sys import dont_write_bytecode
dont_write_bytecode = True dont_write_bytecode = True
def launch_setup(context, *args, **kwargs): def launch_setup(context, *args, **kwargs):
# Retrieve the resolved value of the launch argument 'mode' # Retrieve the resolved value of the launch argument 'mode'
mode = LaunchConfiguration('mode').perform(context) mode = LaunchConfiguration("mode").perform(context)
nodes = [] nodes = []
if mode == 'anchor': if mode == "anchor":
# Launch every node and pass "anchor" as the parameter # Launch every node and pass "anchor" as the parameter
nodes.append( nodes.append(
Node( Node(
package='arm_pkg', package="arm_pkg",
executable='arm', # change as needed executable="arm", # change as needed
name='arm', name="arm",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
nodes.append( nodes.append(
Node( Node(
package='core_pkg', package="core_pkg",
executable='core', # change as needed executable="core", # change as needed
name='core', name="core",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
nodes.append( nodes.append(
Node( Node(
package='core_pkg', package="core_pkg",
executable='ptz', # change as needed executable="ptz", # change as needed
name='ptz', name="ptz",
output='both' output="both",
# Currently don't shutdown all nodes if the PTZ node fails, as it is not critical # Currently don't shutdown all nodes if the PTZ node fails, as it is not critical
# on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure # on_exit=Shutdown() # Uncomment if you want to shutdown on PTZ failure
) )
) )
nodes.append( nodes.append(
Node( Node(
package='bio_pkg', package="bio_pkg",
executable='bio', # change as needed executable="bio", # change as needed
name='bio', name="bio",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
nodes.append( nodes.append(
Node( Node(
package='anchor_pkg', package="anchor_pkg",
executable='anchor', # change as needed executable="anchor", # change as needed
name='anchor', name="anchor",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
elif mode in ['arm', 'core', 'bio', 'ptz']: elif mode in ["arm", "core", "bio", "ptz"]:
# Only launch the node corresponding to the provided mode. # Only launch the node corresponding to the provided mode.
if mode == 'arm': if mode == "arm":
nodes.append( nodes.append(
Node( Node(
package='arm_pkg', package="arm_pkg",
executable='arm', executable="arm",
name='arm', name="arm",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
elif mode == 'core': elif mode == "core":
nodes.append( nodes.append(
Node( Node(
package='core_pkg', package="core_pkg",
executable='core', executable="core",
name='core', name="core",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
elif mode == 'bio': elif mode == "bio":
nodes.append( nodes.append(
Node( Node(
package='bio_pkg', package="bio_pkg",
executable='bio', executable="bio",
name='bio', name="bio",
output='both', output="both",
parameters=[{'launch_mode': mode}], parameters=[{"launch_mode": mode}],
on_exit=Shutdown() on_exit=Shutdown(),
) )
) )
elif mode == 'ptz': elif mode == "ptz":
nodes.append( nodes.append(
Node( Node(
package='core_pkg', package="core_pkg",
executable='ptz', executable="ptz",
name='ptz', name="ptz",
output='both', output="both",
on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched on_exit=Shutdown(), # on fail, shutdown if this was the only node to be launched
) )
) )
@@ -119,14 +125,12 @@ def launch_setup(context, *args, **kwargs):
return nodes return nodes
def generate_launch_description(): def generate_launch_description():
declare_arg = DeclareLaunchArgument( declare_arg = DeclareLaunchArgument(
'mode', "mode",
default_value='anchor', default_value="anchor",
description='Launch mode: arm, core, bio, anchor, or ptz' description="Launch mode: arm, core, bio, anchor, or ptz",
) )
return LaunchDescription([ return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)])
declare_arg,
OpaqueFunction(function=launch_setup)
])

View File

@@ -2,27 +2,24 @@ from setuptools import find_packages, setup
from os import path from os import path
from glob import glob from glob import glob
package_name = 'anchor_pkg' package_name = "anchor_pkg"
setup( setup(
name=package_name, name=package_name,
version='0.0.0', version="0.0.0",
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
('share/ament_index/resource_index/packages', ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
['resource/' + package_name]), (path.join("share", package_name), ["package.xml"]),
(path.join("share", package_name), ['package.xml']), (path.join("share", package_name, "launch"), glob("launch/*")),
(path.join("share", package_name, "launch"), glob("launch/*"))
], ],
install_requires=['setuptools'], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer='tristan', maintainer="tristan",
maintainer_email='tristanmcginnis26@gmail.com', maintainer_email="tristanmcginnis26@gmail.com",
description='Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN', description="Anchor node used to run all modules through a single modules MCU/Computer. Commands to all modules will be relayed through CAN",
license='All Rights Reserved', license="All Rights Reserved",
entry_points={ entry_points={
'console_scripts': [ "console_scripts": ["anchor = anchor_pkg.anchor_node:main"],
"anchor = anchor_pkg.anchor_node:main"
],
}, },
) )

View File

@@ -64,7 +64,9 @@ class SerialRelay(Node):
"Wrist-EF_Roll_Joint", "Wrist-EF_Roll_Joint",
"Gripper_Slider_Left", "Gripper_Slider_Left",
] ]
self.joint_state.position = [0.0] * len(self.joint_state.name) # Initialize with zeros self.joint_state.position = [0.0] * len(
self.joint_state.name
) # Initialize with zeros
self.joint_command_sub = self.create_subscription( self.joint_command_sub = self.create_subscription(
JointState, "/joint_commands", self.joint_command_callback, 10 JointState, "/joint_commands", self.joint_command_callback, 10
@@ -233,8 +235,12 @@ class SerialRelay(Node):
if len(parts) >= 4: if len(parts) >= 4:
self.digit_feedback.wrist_angle = float(parts[3]) self.digit_feedback.wrist_angle = float(parts[3])
# self.digit_feedback.wrist_roll = float(parts[4]) # self.digit_feedback.wrist_roll = float(parts[4])
self.joint_state.position[4] = math.radians(float(parts[4])) # Wrist roll self.joint_state.position[4] = math.radians(
self.joint_state.position[5] = math.radians(float(parts[3])) # Wrist yaw float(parts[4])
) # Wrist roll
self.joint_state.position[5] = math.radians(
float(parts[3])
) # Wrist yaw
else: else:
return return

View File

@@ -2,27 +2,26 @@ from setuptools import find_packages, setup
import os import os
from glob import glob from glob import glob
package_name = 'arm_pkg' package_name = "arm_pkg"
setup( setup(
name=package_name, name=package_name,
version='1.0.0', version="1.0.0",
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
('share/ament_index/resource_index/packages', ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
['resource/' + package_name]), ("share/" + package_name, ["package.xml"]),
('share/' + package_name, ['package.xml'])
], ],
install_requires=['setuptools'], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer='tristan', maintainer="tristan",
maintainer_email='tristanmcginnis26@gmail.com', maintainer_email="tristanmcginnis26@gmail.com",
description='TODO: Package description', description="TODO: Package description",
license='All Rights Reserved', license="All Rights Reserved",
entry_points={ entry_points={
'console_scripts': [ "console_scripts": [
'arm = arm_pkg.arm_node:main', "arm = arm_pkg.arm_node:main",
'headless = arm_pkg.arm_headless:main' "headless = arm_pkg.arm_headless:main",
], ],
}, },
) )

View File

@@ -15,43 +15,48 @@ from ros2_interfaces_pkg.msg import BioFeedback
serial_pub = None serial_pub = None
thread = None thread = None
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
super().__init__("bio_node") super().__init__("bio_node")
# Get launch mode parameter # Get launch mode parameter
self.declare_parameter('launch_mode', 'bio') self.declare_parameter("launch_mode", "bio")
self.launch_mode = self.get_parameter('launch_mode').value self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"bio launch_mode is: {self.launch_mode}") self.get_logger().info(f"bio launch_mode is: {self.launch_mode}")
# Create publishers # Create publishers
self.debug_pub = self.create_publisher(String, '/bio/feedback/debug', 10) self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10)
self.feedback_pub = self.create_publisher(BioFeedback, '/bio/feedback', 10) self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10)
# Create subscribers # Create subscribers
self.control_sub = self.create_subscription(BioControl, '/bio/control', self.send_control, 10) self.control_sub = self.create_subscription(
BioControl, "/bio/control", self.send_control, 10
)
# Create a publisher for telemetry # Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
# Topics used in anchor mode # Topics used in anchor mode
if self.launch_mode == 'anchor': if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription(String, '/anchor/bio/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) String, "/anchor/bio/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.bio_feedback = BioFeedback() self.bio_feedback = BioFeedback()
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == 'bio': if self.launch_mode == "bio":
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
self.port = None self.port = None
for i in range(2): for i in range(2):
try: try:
# connect and send a ping command # connect and send a ping command
set_port = '/dev/ttyACM0' #MCU is controlled through GPIO pins on the PI set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
)
ser = serial.Serial(set_port, 115200, timeout=1) ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...") # print(f"Checking port {port}...")
ser.write(b"ping\n") ser.write(b"ping\n")
@@ -66,7 +71,9 @@ class SerialRelay(Node):
pass pass
if self.port is None: if self.port is None:
self.get_logger().info("Unable to find MCU... please make sure it is connected.") self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1) time.sleep(1)
sys.exit(1) sys.exit(1)
@@ -82,7 +89,7 @@ class SerialRelay(Node):
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == 'bio': if self.launch_mode == "bio":
if self.ser.in_waiting: if self.ser.in_waiting:
self.read_mcu() self.read_mcu()
else: else:
@@ -92,7 +99,6 @@ class SerialRelay(Node):
finally: finally:
self.cleanup() self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings # Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self): def read_mcu(self):
try: try:
@@ -123,26 +129,41 @@ class SerialRelay(Node):
def send_ik(self, msg): def send_ik(self, msg):
pass pass
def send_control(self, msg: BioControl): def send_control(self, msg: BioControl):
# CITADEL Control Commands # CITADEL Control Commands
################ ################
# Chem Pumps, only send if not zero # Chem Pumps, only send if not zero
if msg.pump_id != 0: if msg.pump_id != 0:
command = "can_relay_tovic,citadel,27," + str(msg.pump_id) + "," + str(msg.pump_amount) + "\n" command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command) self.send_cmd(command)
# Fans, only send if not zero # Fans, only send if not zero
if msg.fan_id != 0: if msg.fan_id != 0:
command = "can_relay_tovic,citadel,40," + str(msg.fan_id) + "," + str(msg.fan_duration) + "\n" command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command) self.send_cmd(command)
# Servos, only send if not zero # Servos, only send if not zero
if msg.servo_id != 0: if msg.servo_id != 0:
command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(int(msg.servo_state)) + "\n" command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command) self.send_cmd(command)
# LSS (SCYTHE) # LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n" command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command) # self.send_cmd(command)
@@ -151,7 +172,6 @@ class SerialRelay(Node):
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n" command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command) # self.send_cmd(command)
# FAERIE Control Commands # FAERIE Control Commands
################ ################
@@ -169,14 +189,14 @@ class SerialRelay(Node):
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n" command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command) self.send_cmd(command)
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay if (
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String() output = String()
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == 'bio': #if in standalone mode, send to MCU directly elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Bio to MCU] {msg}") self.get_logger().info(f"[Bio to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
@@ -185,7 +205,9 @@ class SerialRelay(Node):
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
# self.get_logger().info(f"[Bio Anchor] {msg.data}") # self.get_logger().info(f"[Bio Anchor] {msg.data}")
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100 if output.startswith(
"can_relay_fromvic,citadel,54"
): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0 self.bio_feedback.voltage_12 = float(parts[4]) / 100.0
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0 self.bio_feedback.voltage_5 = float(parts[5]) / 100.0
@@ -209,11 +231,13 @@ class SerialRelay(Node):
except Exception as e: except Exception as e:
exit(0) exit(0)
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub: if serial_pub:
serial_pub.cleanup() serial_pub.cleanup()
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook sys.excepthook = myexcepthook
@@ -222,7 +246,10 @@ def main(args=None):
serial_pub = SerialRelay() serial_pub = SerialRelay()
serial_pub.run() serial_pub.run()
if __name__ == '__main__':
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -1,25 +1,22 @@
from setuptools import find_packages, setup from setuptools import find_packages, setup
package_name = 'bio_pkg' package_name = "bio_pkg"
setup( setup(
name=package_name, name=package_name,
version='0.0.0', version="0.0.0",
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
('share/ament_index/resource_index/packages', ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
['resource/' + package_name]), ("share/" + package_name, ["package.xml"]),
('share/' + package_name, ['package.xml']),
], ],
install_requires=['setuptools'], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer='tristan', maintainer="tristan",
maintainer_email='tristanmcginnis26@gmail.com', maintainer_email="tristanmcginnis26@gmail.com",
description='TODO: Package description', description="TODO: Package description",
license='TODO: License declaration', license="TODO: License declaration",
entry_points={ entry_points={
'console_scripts': [ "console_scripts": ["bio = bio_pkg.bio_node:main"],
'bio = bio_pkg.bio_node:main'
],
}, },
) )

View File

@@ -17,11 +17,14 @@ from ros2_interfaces_pkg.msg import CoreControl
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
max_speed = 90 # Max speed as a duty cycle percentage (1-100) max_speed = 90 # Max speed as a duty cycle percentage (1-100)
class Headless(Node): class Headless(Node):
def __init__(self): def __init__(self):
# Initialize pygame first # Initialize pygame first
@@ -43,13 +46,15 @@ class Headless(Node):
# Initialize the gamepad # Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0) self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}') print(f"Gamepad Found: {self.gamepad.get_name()}")
# Now initialize the ROS2 node # Now initialize the ROS2 node
super().__init__("core_headless") super().__init__("core_headless")
self.create_timer(0.15, self.send_controls) self.create_timer(0.15, self.send_controls)
self.publisher = self.create_publisher(CoreControl, '/core/control', 10) self.publisher = self.create_publisher(CoreControl, "/core/control", 10)
self.lastMsg = String() # Used to ignore sending controls repeatedly when they do not change self.lastMsg = (
String()
) # Used to ignore sending controls repeatedly when they do not change
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
@@ -91,15 +96,17 @@ class Headless(Node):
else: else:
input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis input.left_stick = -1 * round(self.gamepad.get_axis(1), 2) # left y-axis
output = f'L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}' output = f"L: {input.left_stick}, R: {input.right_stick}, M: {max_speed}"
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.publisher.publish(input) self.publisher.publish(input)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = Headless() node = Headless()
rclpy.spin(node) rclpy.spin(node)
rclpy.shutdown() rclpy.shutdown()
if __name__ == '__main__':
if __name__ == "__main__":
main() main()

View File

@@ -38,7 +38,7 @@ control_qos = qos.QoSProfile(
deadline=Duration(seconds=1), deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5) liveliness_lease_duration=Duration(seconds=5),
) )
# Used to verify the length of an incoming VicCAN feedback message # Used to verify the length of an incoming VicCAN feedback message
@@ -52,7 +52,7 @@ viccan_msg_len_dict = {
53: 4, 53: 4,
54: 4, 54: 4,
56: 4, # really 3, but viccan 56: 4, # really 3, but viccan
58: 4 # ditto 58: 4, # ditto
} }
@@ -62,77 +62,110 @@ class SerialRelay(Node):
super().__init__("core_node") super().__init__("core_node")
# Launch mode -- anchor vs core # Launch mode -- anchor vs core
self.declare_parameter('launch_mode', 'core') self.declare_parameter("launch_mode", "core")
self.launch_mode = self.get_parameter('launch_mode').value self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
################################################## ##################################################
# Topics # Topics
# Anchor # Anchor
if self.launch_mode == 'anchor': if self.launch_mode == "anchor":
self.anchor_fromvic_sub_ = self.create_subscription(VicCAN, '/anchor/from_vic/core', self.relay_fromvic, 20) self.anchor_fromvic_sub_ = self.create_subscription(
self.anchor_tovic_pub_ = self.create_publisher(VicCAN, '/anchor/to_vic/relay', 20) VicCAN, "/anchor/from_vic/core", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
self.anchor_sub = self.create_subscription(String, '/anchor/core/feedback', self.anchor_feedback, 10) self.anchor_sub = self.create_subscription(
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10) String, "/anchor/core/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Control # Control
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
self.cmd_vel_sub_ = self.create_subscription(TwistStamped, '/cmd_vel', self.cmd_vel_callback, 1) self.cmd_vel_sub_ = self.create_subscription(
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1
)
# manual twist -- [-1, 1] rather than real units # manual twist -- [-1, 1] rather than real units
self.twist_man_sub_ = self.create_subscription(Twist, '/core/twist', self.twist_man_callback, qos_profile=control_qos) self.twist_man_sub_ = self.create_subscription(
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
)
# manual flags -- brake mode and max duty cycle # manual flags -- brake mode and max duty cycle
self.control_state_sub_ = self.create_subscription(CoreCtrlState, '/core/control/state', self.control_state_callback, qos_profile=control_qos) self.control_state_sub_ = self.create_subscription(
self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 CoreCtrlState,
"/core/control/state",
self.control_state_callback,
qos_profile=control_qos,
)
self.twist_max_duty = (
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
)
# Feedback # Feedback
# Consolidated and organized core feedback # Consolidated and organized core feedback
self.feedback_new_pub_ = self.create_publisher(NewCoreFeedback, '/core/feedback_new', qos_profile=qos.qos_profile_sensor_data) self.feedback_new_pub_ = self.create_publisher(
NewCoreFeedback,
"/core/feedback_new",
qos_profile=qos.qos_profile_sensor_data,
)
self.feedback_new_state = NewCoreFeedback() self.feedback_new_state = NewCoreFeedback()
self.feedback_new_state.fl_motor.id = 1 self.feedback_new_state.fl_motor.id = 1
self.feedback_new_state.bl_motor.id = 2 self.feedback_new_state.bl_motor.id = 2
self.feedback_new_state.fr_motor.id = 3 self.feedback_new_state.fr_motor.id = 3
self.feedback_new_state.br_motor.id = 4 self.feedback_new_state.br_motor.id = 4
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback) # TODO: not sure about this self.telemetry_pub_timer = self.create_timer(
1.0, self.publish_feedback
) # TODO: not sure about this
# Joint states for topic-based controller # Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher(JointState, '/core/joint_states', qos_profile=qos.qos_profile_sensor_data) self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data
)
# IMU (embedded BNO-055) # IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher(Imu, '/core/imu', qos_profile=qos.qos_profile_sensor_data) self.imu_pub_ = self.create_publisher(
Imu, "/core/imu", qos_profile=qos.qos_profile_sensor_data
)
self.imu_state = Imu() self.imu_state = Imu()
self.imu_state.header.frame_id = "core_bno055" self.imu_state.header.frame_id = "core_bno055"
# GPS (embedded u-blox M9N) # GPS (embedded u-blox M9N)
self.gps_pub_ = self.create_publisher(NavSatFix, '/gps/fix', qos_profile=qos.qos_profile_sensor_data) self.gps_pub_ = self.create_publisher(
NavSatFix, "/gps/fix", qos_profile=qos.qos_profile_sensor_data
)
self.gps_state = NavSatFix() self.gps_state = NavSatFix()
self.gps_state.header.frame_id = "core_gps_antenna" self.gps_state.header.frame_id = "core_gps_antenna"
self.gps_state.status.service = NavSatStatus.SERVICE_GPS self.gps_state.status.service = NavSatStatus.SERVICE_GPS
self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX self.gps_state.status.status = NavSatStatus.STATUS_NO_FIX
self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN self.gps_state.position_covariance_type = NavSatFix.COVARIANCE_TYPE_UNKNOWN
# Barometer (embedded BMP-388) # Barometer (embedded BMP-388)
self.baro_pub_ = self.create_publisher(Barometer, '/core/baro', qos_profile=qos.qos_profile_sensor_data) self.baro_pub_ = self.create_publisher(
Barometer, "/core/baro", qos_profile=qos.qos_profile_sensor_data
)
self.baro_state = Barometer() self.baro_state = Barometer()
self.baro_state.header.frame_id = "core_bmp388" self.baro_state.header.frame_id = "core_bmp388"
# Old # Old
# /core/control # /core/control
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff self.control_sub = self.create_subscription(
CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback # /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10) self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback() self.core_feedback = CoreFeedback()
# Debug # Debug
self.debug_pub = self.create_publisher(String, '/core/debug', 10) self.debug_pub = self.create_publisher(String, "/core/debug", 10)
self.ping_service = self.create_service(Empty, '/astra/core/ping', self.ping_callback) self.ping_service = self.create_service(
Empty, "/astra/core/ping", self.ping_callback
)
################################################## ##################################################
# Find microcontroller (Non-anchor only) # Find microcontroller (Non-anchor only)
# Core (non-anchor) specific # Core (non-anchor) specific
if self.launch_mode == 'core': if self.launch_mode == "core":
# Loop through all serial devices on the computer to check for the MCU # Loop through all serial devices on the computer to check for the MCU
self.port = None self.port = None
ports = SerialRelay.list_serial_ports() ports = SerialRelay.list_serial_ports()
@@ -166,7 +199,6 @@ class SerialRelay(Node):
atexit.register(self.cleanup) atexit.register(self.cleanup)
# end __init__() # end __init__()
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
global thread global thread
@@ -175,7 +207,7 @@ class SerialRelay(Node):
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == 'core': if self.launch_mode == "core":
self.read_MCU() # Check the MCU for updates self.read_MCU() # Check the MCU for updates
except KeyboardInterrupt: except KeyboardInterrupt:
sys.exit(0) sys.exit(0)
@@ -227,14 +259,26 @@ class SerialRelay(Node):
return str(rightMin + (valueScaled * rightSpan)) return str(rightMin + (valueScaled * rightSpan))
def send_controls(self, msg: CoreControl): def send_controls(self, msg: CoreControl):
if(msg.turn_to_enable): if msg.turn_to_enable:
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n' command = (
"can_relay_tovic,core,41,"
+ str(msg.turn_to)
+ ","
+ str(msg.turn_to_timeout)
+ "\n"
)
else: else:
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n' command = (
"can_relay_tovic,core,19,"
+ self.scale_duty(msg.left_stick, msg.max_speed)
+ ","
+ self.scale_duty(msg.right_stick, msg.max_speed)
+ "\n"
)
self.send_cmd(command) self.send_cmd(command)
# Brake mode # Brake mode
command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n' command = "can_relay_tovic,core,18," + str(int(msg.brake)) + "\n"
self.send_cmd(command) self.send_cmd(command)
# print(f"[Sys] Relaying: {command}") # print(f"[Sys] Relaying: {command}")
@@ -255,13 +299,15 @@ class SerialRelay(Node):
linear = msg.linear.x # [-1 1] for forward/back from left stick y linear = msg.linear.x # [-1 1] for forward/back from left stick y
angular = msg.angular.z # [-1 1] for left/right from right stick x angular = msg.angular.z # [-1 1] for left/right from right stick x
if (linear < 0): # reverse turning direction when going backwards (WIP) if linear < 0: # reverse turning direction when going backwards (WIP)
angular *= -1 angular *= -1
if abs(linear) > 1 or abs(angular) > 1: if abs(linear) > 1 or abs(angular) > 1:
# if speed is greater than 1, then there is a problem # if speed is greater than 1, then there is a problem
# make it look like a problem and don't just run away lmao # make it look like a problem and don't just run away lmao
linear = copysign(0.25, linear) # 0.25 duty cycle in direction of control (hopefully slow) linear = copysign(
0.25, linear
) # 0.25 duty cycle in direction of control (hopefully slow)
angular = copysign(0.25, angular) angular = copysign(0.25, angular)
duty_left = linear - angular duty_left = linear - angular
@@ -272,8 +318,12 @@ class SerialRelay(Node):
# Apply max duty cycle # Apply max duty cycle
# Joysticks provide values [-1, 1] rather than real units # Joysticks provide values [-1, 1] rather than real units
duty_left = map_range(duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty) duty_left = map_range(
duty_right = map_range(duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty) duty_left, -1, 1, -self.twist_max_duty, self.twist_max_duty
)
duty_right = map_range(
duty_right, -1, 1, -self.twist_max_duty, self.twist_max_duty
)
self.send_viccan(19, [duty_left, duty_right]) self.send_viccan(19, [duty_left, duty_right])
@@ -285,24 +335,24 @@ class SerialRelay(Node):
self.twist_max_duty = msg.max_duty # twist_man_callback will handle this self.twist_max_duty = msg.max_duty # twist_man_callback will handle this
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if self.launch_mode == 'anchor': if self.launch_mode == "anchor":
# self.get_logger().info(f"[Core to Anchor Relay] {msg}") # self.get_logger().info(f"[Core to Anchor Relay] {msg}")
output = String() # Convert to std_msg string output = String() # Convert to std_msg string
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == 'core': elif self.launch_mode == "core":
self.get_logger().info(f"[Core to MCU] {msg}") self.get_logger().info(f"[Core to MCU] {msg}")
self.ser.write(bytes(msg, "utf8")) self.ser.write(bytes(msg, "utf8"))
def send_viccan(self, cmd_id: int, data: list[float]): def send_viccan(self, cmd_id: int, data: list[float]):
self.anchor_tovic_pub_.publish(VicCAN( self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"), header=Header(stamp=self.get_clock().now().to_msg(), frame_id="to_vic"),
mcu_name="core", mcu_name="core",
command_id=cmd_id, command_id=cmd_id,
data=data data=data,
)) )
)
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
@@ -376,7 +426,9 @@ class SerialRelay(Node):
if msg.command_id in viccan_msg_len_dict: if msg.command_id in viccan_msg_len_dict:
expected_len = viccan_msg_len_dict[msg.command_id] expected_len = viccan_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len: if len(msg.data) != expected_len:
self.get_logger().warning(f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})") self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return return
match msg.command_id: match msg.command_id:
@@ -386,7 +438,11 @@ class SerialRelay(Node):
case 49: # GNSS Longitude case 49: # GNSS Longitude
self.gps_state.longitude = float(msg.data[0]) self.gps_state.longitude = float(msg.data[0])
case 50: # GNSS Satellite count and altitude case 50: # GNSS Satellite count and altitude
self.gps_state.status.status = NavSatStatus.STATUS_FIX if int(msg.data[0]) >= 3 else NavSatStatus.STATUS_NO_FIX self.gps_state.status.status = (
NavSatStatus.STATUS_FIX
if int(msg.data[0]) >= 3
else NavSatStatus.STATUS_NO_FIX
)
self.gps_state.altitude = float(msg.data[1]) self.gps_state.altitude = float(msg.data[1])
self.gps_state.header.stamp = msg.header.stamp self.gps_state.header.stamp = msg.header.stamp
self.gps_pub_.publish(self.gps_state) self.gps_pub_.publish(self.gps_state)
@@ -402,7 +458,7 @@ class SerialRelay(Node):
self.imu_state.linear_acceleration.y = float(msg.data[1]) self.imu_state.linear_acceleration.y = float(msg.data[1])
self.imu_state.linear_acceleration.z = float(msg.data[2]) self.imu_state.linear_acceleration.z = float(msg.data[2])
# Deal with quaternion # Deal with quaternion
r = Rotation.from_euler('z', float(msg.data[3]), degrees=True) r = Rotation.from_euler("z", float(msg.data[3]), degrees=True)
q = r.as_quat() q = r.as_quat()
self.imu_state.orientation.x = q[0] self.imu_state.orientation.x = q[0]
self.imu_state.orientation.y = q[1] self.imu_state.orientation.y = q[1]
@@ -427,7 +483,9 @@ class SerialRelay(Node):
case 4: case 4:
motor = self.feedback_new_state.br_motor motor = self.feedback_new_state.br_motor
case _: case _:
self.get_logger().warning(f"Ignoring REV motor feedback 53 with invalid motorId {motorId}") self.get_logger().warning(
f"Ignoring REV motor feedback 53 with invalid motorId {motorId}"
)
return return
if motor: if motor:
@@ -455,9 +513,15 @@ class SerialRelay(Node):
motorId = round(float(msg.data[0])) motorId = round(float(msg.data[0]))
position = float(msg.data[1]) position = float(msg.data[1])
velocity = float(msg.data[2]) velocity = float(msg.data[2])
joint_state_msg = JointState() # TODO: not sure if all motors should be in each message or not joint_state_msg = (
joint_state_msg.position = [position * (2 * pi) / CORE_GEAR_RATIO] # revolutions to radians JointState()
joint_state_msg.velocity = [velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO] # RPM to rad/s ) # TODO: not sure if all motors should be in each message or not
joint_state_msg.position = [
position * (2 * pi) / CORE_GEAR_RATIO
] # revolutions to radians
joint_state_msg.velocity = [
velocity * (2 * pi / 60.0) / CORE_GEAR_RATIO
] # RPM to rad/s
motor: RevMotorState | None = None motor: RevMotorState | None = None
@@ -475,7 +539,9 @@ class SerialRelay(Node):
motor = self.feedback_new_state.br_motor motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_motor_joint"] joint_state_msg.name = ["br_motor_joint"]
case _: case _:
self.get_logger().warning(f"Ignoring REV motor feedback 58 with invalid motorId {motorId}") self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
)
return return
joint_state_msg.header.stamp = msg.header.stamp joint_state_msg.header.stamp = msg.header.stamp
@@ -483,7 +549,6 @@ class SerialRelay(Node):
case _: case _:
return return
def publish_feedback(self): def publish_feedback(self):
# self.get_logger().info(f"[Core] {self.core_feedback}") # self.get_logger().info(f"[Core] {self.core_feedback}")
self.feedback_pub.publish(self.core_feedback) self.feedback_pub.publish(self.core_feedback)
@@ -491,7 +556,6 @@ class SerialRelay(Node):
def ping_callback(self, request, response): def ping_callback(self, request, response):
return response return response
@staticmethod @staticmethod
def list_serial_ports(): def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
@@ -510,7 +574,10 @@ def myexcepthook(type, value, tb):
if serial_pub: if serial_pub:
serial_pub.cleanup() serial_pub.cleanup()
def map_range(value: float, in_min: float, in_max: float, out_min: float, out_max: float):
def map_range(
value: float, in_min: float, in_max: float, out_min: float, out_max: float
):
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
@@ -523,7 +590,10 @@ def main(args=None):
serial_pub = SerialRelay() serial_pub = SerialRelay()
serial_pub.run() serial_pub.run()
if __name__ == '__main__':
if __name__ == "__main__":
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly # signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()

View File

@@ -18,37 +18,43 @@ from core_pkg.siyi_sdk import (
DataStreamType, DataStreamType,
DataStreamFrequency, DataStreamFrequency,
SingleAxis, SingleAxis,
AttitudeData AttitudeData,
) )
class PtzNode(Node): class PtzNode(Node):
def __init__(self): def __init__(self):
# Initialize node with name # Initialize node with name
super().__init__("core_ptz") super().__init__("core_ptz")
# Declare parameters # Declare parameters
self.declare_parameter('camera_ip', '192.168.1.9') self.declare_parameter("camera_ip", "192.168.1.9")
self.declare_parameter('camera_port', 37260) self.declare_parameter("camera_port", 37260)
# Get parameters # Get parameters
self.camera_ip = self.get_parameter('camera_ip').value self.camera_ip = self.get_parameter("camera_ip").value
self.camera_port = self.get_parameter('camera_port').value self.camera_port = self.get_parameter("camera_port").value
self.get_logger().info(f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}") self.get_logger().info(
f"PTZ camera IP: {self.camera_ip} Port: {self.camera_port}"
)
# Create a camera instance # Create a camera instance
self.camera = None self.camera = None
self.camera_connected = False # This flag is still managed but not used to gate commands self.camera_connected = (
False # This flag is still managed but not used to gate commands
)
self.loop = None self.loop = None
self.thread_pool = None self.thread_pool = None
# Create publishers # Create publishers
self.feedback_pub = self.create_publisher(PtzFeedback, '/ptz/feedback', 10) self.feedback_pub = self.create_publisher(PtzFeedback, "/ptz/feedback", 10)
self.debug_pub = self.create_publisher(String, '/ptz/debug', 10) self.debug_pub = self.create_publisher(String, "/ptz/debug", 10)
# Create subscribers # Create subscribers
self.control_sub = self.create_subscription( self.control_sub = self.create_subscription(
PtzControl, '/ptz/control', self.handle_control_command, 10) PtzControl, "/ptz/control", self.handle_control_command, 10
)
# Create timers # Create timers
self.connection_timer = self.create_timer(5.0, self.check_camera_connection) self.connection_timer = self.create_timer(5.0, self.check_camera_connection)
@@ -57,7 +63,9 @@ class PtzNode(Node):
# Create feedback message # Create feedback message
self.feedback_msg = PtzFeedback() self.feedback_msg = PtzFeedback()
self.feedback_msg.connected = False # This will reflect the actual connection state self.feedback_msg.connected = (
False # This will reflect the actual connection state
)
self.feedback_msg.error_msg = "Initializing" self.feedback_msg.error_msg = "Initializing"
# Flags for async operations # Flags for async operations
@@ -86,8 +94,7 @@ class PtzNode(Node):
# Request attitude data stream # Request attitude data stream
await self.camera.send_data_stream_request( await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA, DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_10
DataStreamFrequency.HZ_10
) )
# Update connection status # Update connection status
@@ -108,8 +115,12 @@ class PtzNode(Node):
# Update last_data_time regardless of self.camera_connected, # Update last_data_time regardless of self.camera_connected,
# as data might arrive during a brief reconnect window. # as data might arrive during a brief reconnect window.
self.last_data_time = time.time() self.last_data_time = time.time()
if self.camera_connected: # Only process for feedback if we believe we are connected if (
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): self.camera_connected
): # Only process for feedback if we believe we are connected
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(
data, AttitudeData
):
self.feedback_msg.yaw = data.yaw self.feedback_msg.yaw = data.yaw
self.feedback_msg.pitch = data.pitch self.feedback_msg.pitch = data.pitch
self.feedback_msg.roll = data.roll self.feedback_msg.roll = data.roll
@@ -130,7 +141,6 @@ class PtzNode(Node):
debug_str += str(data) debug_str += str(data)
self.get_logger().debug(debug_str) self.get_logger().debug(debug_str)
def check_camera_connection(self): def check_camera_connection(self):
"""Periodically check camera connection and attempt to reconnect if needed.""" """Periodically check camera connection and attempt to reconnect if needed."""
if not self.camera_connected and not self.shutdown_requested: if not self.camera_connected and not self.shutdown_requested:
@@ -140,7 +150,9 @@ class PtzNode(Node):
if self.camera.is_connected: # SDK's internal connection state if self.camera.is_connected: # SDK's internal connection state
self.run_async_func(self.camera.disconnect()) self.run_async_func(self.camera.disconnect())
except Exception as e: except Exception as e:
self.get_logger().debug(f"Error during pre-reconnect disconnect: {e}") self.get_logger().debug(
f"Error during pre-reconnect disconnect: {e}"
)
# self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new # self.camera = None # Don't nullify here, connect_to_camera will re-assign or create new
self.connect_task = self.thread_pool.submit( self.connect_task = self.thread_pool.submit(
@@ -152,7 +164,9 @@ class PtzNode(Node):
if self.camera_connected: # Only check health if we think we are connected if self.camera_connected: # Only check health if we think we are connected
time_since_last_data = time.time() - self.last_data_time time_since_last_data = time.time() - self.last_data_time
if time_since_last_data > 5.0: if time_since_last_data > 5.0:
self.publish_debug(f"No camera data for {time_since_last_data:.1f}s, marking as disconnected.") self.publish_debug(
f"No camera data for {time_since_last_data:.1f}s, marking as disconnected."
)
self.camera_connected = False self.camera_connected = False
self.feedback_msg.connected = False self.feedback_msg.connected = False
self.feedback_msg.error_msg = "Connection stale (no data)" self.feedback_msg.error_msg = "Connection stale (no data)"
@@ -162,18 +176,19 @@ class PtzNode(Node):
"""Handle incoming control commands.""" """Handle incoming control commands."""
# Removed: if not self.camera_connected # Removed: if not self.camera_connected
if not self.camera: # Still check if camera object exists if not self.camera: # Still check if camera object exists
self.get_logger().warning("Camera object not initialized, ignoring control command") self.get_logger().warning(
"Camera object not initialized, ignoring control command"
)
return return
self.thread_pool.submit( self.thread_pool.submit(self.run_async_func, self.process_control_command(msg))
self.run_async_func,
self.process_control_command(msg)
)
async def process_control_command(self, msg): async def process_control_command(self, msg):
"""Process and send the control command to the camera.""" """Process and send the control command to the camera."""
if not self.camera: if not self.camera:
self.get_logger().error("Process control command called but camera object is None.") self.get_logger().error(
"Process control command called but camera object is None."
)
return return
try: try:
# The SDK's send_... methods will raise RuntimeError if not connected. # The SDK's send_... methods will raise RuntimeError if not connected.
@@ -186,27 +201,35 @@ class PtzNode(Node):
if msg.control_mode == 0: if msg.control_mode == 0:
turn_yaw = max(-100, min(100, int(msg.turn_yaw))) turn_yaw = max(-100, min(100, int(msg.turn_yaw)))
turn_pitch = max(-100, min(100, int(msg.turn_pitch))) turn_pitch = max(-100, min(100, int(msg.turn_pitch)))
self.get_logger().debug(f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}") self.get_logger().debug(
f"Attempting rotation: yaw_speed={turn_yaw}, pitch_speed={turn_pitch}"
)
await self.camera.send_rotation_command(turn_yaw, turn_pitch) await self.camera.send_rotation_command(turn_yaw, turn_pitch)
elif msg.control_mode == 1: elif msg.control_mode == 1:
yaw = max(-135.0, min(135.0, msg.yaw)) yaw = max(-135.0, min(135.0, msg.yaw))
pitch = max(-90.0, min(90.0, msg.pitch)) pitch = max(-90.0, min(90.0, msg.pitch))
self.get_logger().debug(f"Attempting absolute angles: yaw={yaw}, pitch={pitch}") self.get_logger().debug(
f"Attempting absolute angles: yaw={yaw}, pitch={pitch}"
)
await self.camera.send_attitude_angles_command(yaw, pitch) await self.camera.send_attitude_angles_command(yaw, pitch)
elif msg.control_mode == 2: elif msg.control_mode == 2:
axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH axis = SingleAxis.YAW if msg.axis_id == 0 else SingleAxis.PITCH
angle = msg.angle angle = msg.angle
self.get_logger().debug(f"Attempting single axis: axis={axis.name}, angle={angle}") self.get_logger().debug(
f"Attempting single axis: axis={axis.name}, angle={angle}"
)
await self.camera.send_single_axis_attitude_command(angle, axis) await self.camera.send_single_axis_attitude_command(angle, axis)
elif msg.control_mode == 3: elif msg.control_mode == 3:
zoom_level = msg.zoom_level zoom_level = msg.zoom_level
self.get_logger().debug(f"Attempting absolute zoom: level={zoom_level}x") self.get_logger().debug(
f"Attempting absolute zoom: level={zoom_level}x"
)
await self.camera.send_absolute_zoom_command(zoom_level) await self.camera.send_absolute_zoom_command(zoom_level)
if hasattr(msg, 'stream_type') and hasattr(msg, 'stream_freq'): if hasattr(msg, "stream_type") and hasattr(msg, "stream_freq"):
if msg.stream_type > 0 and msg.stream_freq >= 0: if msg.stream_type > 0 and msg.stream_freq >= 0:
try: try:
stream_type = DataStreamType(msg.stream_type) stream_type = DataStreamType(msg.stream_type)
@@ -214,9 +237,13 @@ class PtzNode(Node):
self.get_logger().info( self.get_logger().info(
f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}" f"Attempting to set data stream: type={stream_type.name}, freq={stream_freq.name}"
) )
await self.camera.send_data_stream_request(stream_type, stream_freq) await self.camera.send_data_stream_request(
stream_type, stream_freq
)
except ValueError: except ValueError:
self.get_logger().error("Invalid stream type or frequency values in control message") self.get_logger().error(
"Invalid stream type or frequency values in control message"
)
except RuntimeError as e: # Catch SDK's "not connected" errors except RuntimeError as e: # Catch SDK's "not connected" errors
self.get_logger().warning(f"SDK command failed (likely not connected): {e}") self.get_logger().warning(f"SDK command failed (likely not connected): {e}")
@@ -231,7 +258,9 @@ class PtzNode(Node):
def publish_debug(self, message_text): def publish_debug(self, message_text):
"""Publish debug message.""" """Publish debug message."""
msg = String() msg = String()
msg.data = f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}" msg.data = (
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
)
self.debug_pub.publish(msg) self.debug_pub.publish(msg)
self.get_logger().info(message_text) self.get_logger().info(message_text)
@@ -239,18 +268,23 @@ class PtzNode(Node):
"""Run an async function in the event loop.""" """Run an async function in the event loop."""
if self.loop and self.loop.is_running(): if self.loop and self.loop.is_running():
try: try:
return asyncio.run_coroutine_threadsafe(coro, self.loop).result(timeout=5.0) # Added timeout return asyncio.run_coroutine_threadsafe(coro, self.loop).result(
timeout=5.0
) # Added timeout
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.get_logger().warning(f"Async function {coro.__name__} timed out.") self.get_logger().warning(f"Async function {coro.__name__} timed out.")
return None return None
except Exception as e: except Exception as e:
self.get_logger().error(f"Exception in run_async_func for {coro.__name__}: {e}") self.get_logger().error(
f"Exception in run_async_func for {coro.__name__}: {e}"
)
return None return None
else: else:
self.get_logger().warning("Asyncio loop not running, cannot execute coroutine.") self.get_logger().warning(
"Asyncio loop not running, cannot execute coroutine."
)
return None return None
async def shutdown_node_async(self): async def shutdown_node_async(self):
"""Perform clean shutdown of camera connection.""" """Perform clean shutdown of camera connection."""
self.shutdown_requested = True self.shutdown_requested = True
@@ -259,8 +293,7 @@ class PtzNode(Node):
try: try:
self.get_logger().info("Disabling data stream...") self.get_logger().info("Disabling data stream...")
await self.camera.send_data_stream_request( await self.camera.send_data_stream_request(
DataStreamType.ATTITUDE_DATA, DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE
DataStreamFrequency.DISABLE
) )
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@@ -287,10 +320,14 @@ class PtzNode(Node):
if self.loop and self.thread_pool: if self.loop and self.thread_pool:
if self.loop.is_running(): if self.loop.is_running():
try: try:
future = asyncio.run_coroutine_threadsafe(self.shutdown_node_async(), self.loop) future = asyncio.run_coroutine_threadsafe(
self.shutdown_node_async(), self.loop
)
future.result(timeout=5) future.result(timeout=5)
except Exception as e: except Exception as e:
self.get_logger().error(f"Error during async shutdown in cleanup: {e}") self.get_logger().error(
f"Error during async shutdown in cleanup: {e}"
)
self.get_logger().info("Shutting down thread pool executor...") self.get_logger().info("Shutting down thread pool executor...")
self.thread_pool.shutdown(wait=True) self.thread_pool.shutdown(wait=True)
@@ -301,7 +338,9 @@ class PtzNode(Node):
self.get_logger().info("PTZ node resources cleaned up.") self.get_logger().info("PTZ node resources cleaned up.")
else: else:
self.get_logger().warning("Loop or thread_pool not initialized, skipping parts of cleanup.") self.get_logger().warning(
"Loop or thread_pool not initialized, skipping parts of cleanup."
)
def main(args=None): def main(args=None):
@@ -312,6 +351,7 @@ def main(args=None):
asyncio_thread = None asyncio_thread = None
if ptz_node.loop: if ptz_node.loop:
def run_event_loop(loop): def run_event_loop(loop):
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
@@ -324,9 +364,7 @@ def main(args=None):
loop.close() loop.close()
asyncio_thread = threading.Thread( asyncio_thread = threading.Thread(
target=run_event_loop, target=run_event_loop, args=(ptz_node.loop,), daemon=True
args=(ptz_node.loop,),
daemon=True
) )
asyncio_thread.start() asyncio_thread.start()
@@ -351,5 +389,5 @@ def main(args=None):
ptz_node.get_logger().info("ROS shutdown complete.") ptz_node.get_logger().info("ROS shutdown complete.")
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View File

@@ -110,9 +110,7 @@ class SiyiGimbalCamera:
MAX_A8_MINI_ZOOM = 6.0 # Maximum zoom for A8 mini MAX_A8_MINI_ZOOM = 6.0 # Maximum zoom for A8 mini
def __init__( def __init__(self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2):
self, ip: str, port: int = 37260, *, heartbeat_interval: int = 2
):
self.ip = ip self.ip = ip
self.port = port self.port = port
self.heartbeat_interval = heartbeat_interval self.heartbeat_interval = heartbeat_interval
@@ -124,9 +122,7 @@ class SiyiGimbalCamera:
async def connect(self) -> None: async def connect(self) -> None:
try: try:
self.reader, self.writer = await asyncio.open_connection( self.reader, self.writer = await asyncio.open_connection(self.ip, self.port)
self.ip, self.port
)
self.is_connected = True self.is_connected = True
asyncio.create_task(self.heartbeat_loop()) asyncio.create_task(self.heartbeat_loop())
asyncio.create_task(self._data_stream_listener()) asyncio.create_task(self._data_stream_listener())
@@ -158,9 +154,7 @@ class SiyiGimbalCamera:
if self.is_connected: if self.is_connected:
await self.disconnect() await self.disconnect()
def _build_packet_header( def _build_packet_header(self, cmd_id: CommandID, data_len: int) -> bytearray:
self, cmd_id: CommandID, data_len: int
) -> bytearray:
"""Helper to build the common packet header.""" """Helper to build the common packet header."""
packet = bytearray() packet = bytearray()
packet.extend(b"\x55\x66") # STX packet.extend(b"\x55\x66") # STX
@@ -179,15 +173,11 @@ class SiyiGimbalCamera:
def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes: def _build_rotation_packet(self, turn_yaw: int, turn_pitch: int) -> bytes:
data_len = 2 data_len = 2
packet = self._build_packet_header( packet = self._build_packet_header(CommandID.ROTATION_CONTROL, data_len)
CommandID.ROTATION_CONTROL, data_len
)
packet.extend(struct.pack("bb", turn_yaw, turn_pitch)) packet.extend(struct.pack("bb", turn_yaw, turn_pitch))
return self._finalize_packet(packet) return self._finalize_packet(packet)
async def send_rotation_command( async def send_rotation_command(self, turn_yaw: int, turn_pitch: int) -> None:
self, turn_yaw: int, turn_pitch: int
) -> None:
if not self.is_connected or not self.writer: if not self.is_connected or not self.writer:
raise RuntimeError( raise RuntimeError(
"Socket is not connected or writer is None, cannot send rotation command." "Socket is not connected or writer is None, cannot send rotation command."
@@ -199,21 +189,15 @@ class SiyiGimbalCamera:
f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}" f"Sent rotation command with yaw_speed {turn_yaw} and pitch_speed {turn_pitch}"
) )
def _build_attitude_angles_packet( def _build_attitude_angles_packet(self, yaw: float, pitch: float) -> bytes:
self, yaw: float, pitch: float
) -> bytes:
data_len = 4 data_len = 4
packet = self._build_packet_header( packet = self._build_packet_header(CommandID.ATTITUDE_ANGLES, data_len)
CommandID.ATTITUDE_ANGLES, data_len
)
yaw_int = int(round(yaw * 10)) yaw_int = int(round(yaw * 10))
pitch_int = int(round(pitch * 10)) pitch_int = int(round(pitch * 10))
packet.extend(struct.pack("<hh", yaw_int, pitch_int)) packet.extend(struct.pack("<hh", yaw_int, pitch_int))
return self._finalize_packet(packet) return self._finalize_packet(packet)
async def send_attitude_angles_command( async def send_attitude_angles_command(self, yaw: float, pitch: float) -> None:
self, yaw: float, pitch: float
) -> None:
if not self.is_connected or not self.writer: if not self.is_connected or not self.writer:
raise RuntimeError( raise RuntimeError(
"Socket is not connected or writer is None, cannot send attitude angles command." "Socket is not connected or writer is None, cannot send attitude angles command."
@@ -221,17 +205,13 @@ class SiyiGimbalCamera:
packet = self._build_attitude_angles_packet(yaw, pitch) packet = self._build_attitude_angles_packet(yaw, pitch)
self.writer.write(packet) self.writer.write(packet)
await self.writer.drain() await self.writer.drain()
logger.debug( logger.debug(f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°")
f"Sent attitude angles command with yaw {yaw}° and pitch {pitch}°"
)
def _build_single_axis_attitude_packet( def _build_single_axis_attitude_packet(
self, angle: float, axis: SingleAxis self, angle: float, axis: SingleAxis
) -> bytes: ) -> bytes:
data_len = 3 data_len = 3
packet = self._build_packet_header( packet = self._build_packet_header(CommandID.SINGLE_AXIS_CONTROL, data_len)
CommandID.SINGLE_AXIS_CONTROL, data_len
)
angle_int = int(round(angle * 10)) angle_int = int(round(angle * 10))
packet.extend(struct.pack("<hB", angle_int, axis.value)) packet.extend(struct.pack("<hB", angle_int, axis.value))
return self._finalize_packet(packet) return self._finalize_packet(packet)
@@ -254,9 +234,7 @@ class SiyiGimbalCamera:
self, data_type: DataStreamType, data_freq: DataStreamFrequency self, data_type: DataStreamType, data_freq: DataStreamFrequency
) -> bytes: ) -> bytes:
data_len = 2 data_len = 2
packet = self._build_packet_header( packet = self._build_packet_header(CommandID.DATA_STREAM_REQUEST, data_len)
CommandID.DATA_STREAM_REQUEST, data_len
)
packet.append(data_type.value) packet.append(data_type.value)
packet.append(data_freq.value) packet.append(data_freq.value)
return self._finalize_packet(packet) return self._finalize_packet(packet)
@@ -279,7 +257,9 @@ class SiyiGimbalCamera:
data_len = 2 data_len = 2
packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len) packet = self._build_packet_header(CommandID.ABSOLUTE_ZOOM, data_len)
zoom_packet_value = int(round(zoom_level * 10)) zoom_packet_value = int(round(zoom_level * 10))
if not (0 <= zoom_packet_value <= 65535): # Should be caught by clamping earlier if not (
0 <= zoom_packet_value <= 65535
): # Should be caught by clamping earlier
raise ValueError( raise ValueError(
"Zoom packet value out of uint16_t range after conversion." "Zoom packet value out of uint16_t range after conversion."
) )
@@ -374,10 +354,7 @@ class SiyiGimbalCamera:
self._data_callback(cmd_id_int, data) self._data_callback(cmd_id_int, data)
continue continue
if ( if cmd_id_enum == CommandID.ATTITUDE_DATA_RESPONSE and len(data) == 12:
cmd_id_enum == CommandID.ATTITUDE_DATA_RESPONSE
and len(data) == 12
):
try: try:
parsed = AttitudeData.from_bytes(data) parsed = AttitudeData.from_bytes(data)
if self._data_callback: if self._data_callback:
@@ -385,9 +362,7 @@ class SiyiGimbalCamera:
else: else:
logger.info(f"Received attitude data: {parsed}") logger.info(f"Received attitude data: {parsed}")
except Exception as e: except Exception as e:
logger.exception( logger.exception(f"Failed to parse attitude data: {e}")
f"Failed to parse attitude data: {e}"
)
if self._data_callback: if self._data_callback:
self._data_callback(cmd_id_enum, data) self._data_callback(cmd_id_enum, data)
else: else:
@@ -429,7 +404,9 @@ async def main_sdk_test(): # Renamed to avoid conflict if this file is imported
gimbal = SiyiGimbalCamera(gimbal_ip) gimbal = SiyiGimbalCamera(gimbal_ip)
def my_data_handler(cmd_id, data): def my_data_handler(cmd_id, data):
if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(data, AttitudeData): if cmd_id == CommandID.ATTITUDE_DATA_RESPONSE and isinstance(
data, AttitudeData
):
print( print(
f"Attitude: Yaw={data.yaw:.1f}, Pitch={data.pitch:.1f}, Roll={data.roll:.1f}" f"Attitude: Yaw={data.yaw:.1f}, Pitch={data.pitch:.1f}, Roll={data.roll:.1f}"
) )
@@ -470,16 +447,19 @@ async def main_sdk_test(): # Renamed to avoid conflict if this file is imported
await asyncio.sleep(2) await asyncio.sleep(2)
print("SDK Test: Requesting attitude data stream at 5Hz...") print("SDK Test: Requesting attitude data stream at 5Hz...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_5) await gimbal.send_data_stream_request(
DataStreamType.ATTITUDE_DATA, DataStreamFrequency.HZ_5
)
print("SDK Test: Listening for data for 10 seconds...") print("SDK Test: Listening for data for 10 seconds...")
await asyncio.sleep(10) await asyncio.sleep(10)
print("SDK Test: Disabling attitude data stream...") print("SDK Test: Disabling attitude data stream...")
await gimbal.send_data_stream_request(DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE) await gimbal.send_data_stream_request(
DataStreamType.ATTITUDE_DATA, DataStreamFrequency.DISABLE
)
await asyncio.sleep(1) await asyncio.sleep(1)
except ConnectionRefusedError: except ConnectionRefusedError:
print( print(
f"SDK Test: Connection to {gimbal_ip} was refused. Is the gimbal on and accessible?" f"SDK Test: Connection to {gimbal_ip} was refused. Is the gimbal on and accessible?"

View File

@@ -24,7 +24,9 @@ async def main():
await asyncio.sleep(2) await asyncio.sleep(2)
# Command 1: Move all the way to the right (using set angles) # Command 1: Move all the way to the right (using set angles)
logger.info("Command 1: Move all the way to the right (using absolute angle control)") logger.info(
"Command 1: Move all the way to the right (using absolute angle control)"
)
await camera.send_attitude_angles_command(135.0, 0.0) await camera.send_attitude_angles_command(135.0, 0.0)
await asyncio.sleep(5) await asyncio.sleep(5)
@@ -35,13 +37,17 @@ async def main():
await asyncio.sleep(5) await asyncio.sleep(5)
# Command 3: Stop looking down, then look up (with the single axis) # Command 3: Stop looking down, then look up (with the single axis)
logger.info("Command 3: Stop looking down and start looking up (single axis control)") logger.info(
"Command 3: Stop looking down and start looking up (single axis control)"
)
await camera.send_rotation_command(0, 0) await camera.send_rotation_command(0, 0)
await camera.send_single_axis_attitude_command(135, SingleAxis.PITCH) await camera.send_single_axis_attitude_command(135, SingleAxis.PITCH)
await asyncio.sleep(5) await asyncio.sleep(5)
# Command 4: Reset and move all the way to the left (Absolute value). # Command 4: Reset and move all the way to the left (Absolute value).
logger.info("Command 4: Move back to the center, and start moving all the way left") logger.info(
"Command 4: Move back to the center, and start moving all the way left"
)
await camera.send_attitude_angles_command(-135.0, 0.0) await camera.send_attitude_angles_command(-135.0, 0.0)
await asyncio.sleep(5) await asyncio.sleep(5)

View File

@@ -1,27 +1,26 @@
from setuptools import find_packages, setup from setuptools import find_packages, setup
package_name = 'core_pkg' package_name = "core_pkg"
setup( setup(
name=package_name, name=package_name,
version='0.0.0', version="0.0.0",
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
('share/ament_index/resource_index/packages', ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
['resource/' + package_name]), ("share/" + package_name, ["package.xml"]),
('share/' + package_name, ['package.xml']),
], ],
install_requires=['setuptools'], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer='tristan', maintainer="tristan",
maintainer_email='tristanmcginnis26@gmail.com', maintainer_email="tristanmcginnis26@gmail.com",
description='Core rover control package to handle command interpretation and embedded interfacing.', description="Core rover control package to handle command interpretation and embedded interfacing.",
license='All Rights Reserved', license="All Rights Reserved",
entry_points={ entry_points={
'console_scripts': [ "console_scripts": [
"core = core_pkg.core_node:main", "core = core_pkg.core_node:main",
"headless = core_pkg.core_headless:main", "headless = core_pkg.core_headless:main",
"ptz = core_pkg.core_ptz:main" "ptz = core_pkg.core_ptz:main",
], ],
}, },
) )

View File

@@ -1,23 +1,23 @@
from setuptools import find_packages, setup from setuptools import find_packages, setup
package_name = 'headless_pkg' package_name = "headless_pkg"
setup( setup(
name=package_name, name=package_name,
version='1.0.0', version="1.0.0",
packages=find_packages(exclude=['test']), packages=find_packages(exclude=["test"]),
data_files=[ data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]), ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
('share/' + package_name, ['package.xml']), ("share/" + package_name, ["package.xml"]),
], ],
install_requires=['setuptools'], install_requires=["setuptools"],
zip_safe=True, zip_safe=True,
maintainer='David Sharpe', maintainer="David Sharpe",
maintainer_email='ds0196@uah.edu', maintainer_email="ds0196@uah.edu",
description='Headless rover control package to handle command interpretation and embedded interfacing.', description="Headless rover control package to handle command interpretation and embedded interfacing.",
license='All Rights Reserved', license="All Rights Reserved",
entry_points={ entry_points={
'console_scripts': [ "console_scripts": [
"headless_full = src.headless_node:main", "headless_full = src.headless_node:main",
], ],
}, },

View File

@@ -21,7 +21,9 @@ from ros2_interfaces_pkg.msg import CoreCtrlState
import pygame import pygame
os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display os.environ["SDL_VIDEODRIVER"] = "dummy" # Prevents pygame from trying to open a display
os.environ["SDL_AUDIODRIVER"] = "dummy" # Force pygame to use a dummy audio driver before pygame.init() os.environ["SDL_AUDIODRIVER"] = (
"dummy" # Force pygame to use a dummy audio driver before pygame.init()
)
CORE_STOP_MSG = CoreControl() # All zeros by default CORE_STOP_MSG = CoreControl() # All zeros by default
@@ -37,7 +39,7 @@ control_qos = qos.QoSProfile(
deadline=Duration(seconds=1), deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT, liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5) liveliness_lease_duration=Duration(seconds=5),
) )
CORE_MODE = "twist" # "twist" or "duty" CORE_MODE = "twist" # "twist" or "duty"
@@ -51,11 +53,11 @@ class Headless(Node):
super().__init__("headless") super().__init__("headless")
# Wait for anchor to start # Wait for anchor to start
pub_info = self.get_publishers_info_by_topic('/anchor/from_vic/debug') pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
while len(pub_info) == 0: while len(pub_info) == 0:
self.get_logger().info("Waiting for anchor to start...") self.get_logger().info("Waiting for anchor to start...")
time.sleep(1.0) time.sleep(1.0)
pub_info = self.get_publishers_info_by_topic('/anchor/from_vic/debug') pub_info = self.get_publishers_info_by_topic("/anchor/from_vic/debug")
# Wait for a gamepad to be connected # Wait for a gamepad to be connected
print("Waiting for gamepad connection...") print("Waiting for gamepad connection...")
@@ -71,17 +73,20 @@ class Headless(Node):
# Initialize the gamepad # Initialize the gamepad
self.gamepad = pygame.joystick.Joystick(0) self.gamepad = pygame.joystick.Joystick(0)
self.gamepad.init() self.gamepad.init()
print(f'Gamepad Found: {self.gamepad.get_name()}') print(f"Gamepad Found: {self.gamepad.get_name()}")
self.create_timer(0.15, self.send_controls) self.create_timer(0.15, self.send_controls)
self.core_publisher = self.create_publisher(CoreControl, '/core/control', 2) self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
self.arm_publisher = self.create_publisher(ArmManual, '/arm/control/manual', 2) self.arm_publisher = self.create_publisher(ArmManual, "/arm/control/manual", 2)
self.bio_publisher = self.create_publisher(BioControl, '/bio/control', 2) self.bio_publisher = self.create_publisher(BioControl, "/bio/control", 2)
self.core_twist_pub_ = self.create_publisher(Twist, '/core/twist', qos_profile=control_qos)
self.core_state_pub_ = self.create_publisher(CoreCtrlState, '/core/control/state', qos_profile=control_qos)
self.core_twist_pub_ = self.create_publisher(
Twist, "/core/twist", qos_profile=control_qos
)
self.core_state_pub_ = self.create_publisher(
CoreCtrlState, "/core/control/state", qos_profile=control_qos
)
self.ctrl_mode = "core" # Start in core mode self.ctrl_mode = "core" # Start in core mode
self.core_brake_mode = False self.core_brake_mode = False
@@ -90,7 +95,6 @@ class Headless(Node):
# Rumble when node is ready (returns False if rumble not supported) # Rumble when node is ready (returns False if rumble not supported)
self.gamepad.rumble(0.7, 0.8, 150) self.gamepad.rumble(0.7, 0.8, 150)
def run(self): def run(self):
# This thread makes all the update processes run in the background # This thread makes all the update processes run in the background
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True) thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
@@ -122,7 +126,6 @@ class Headless(Node):
pygame.quit() pygame.quit()
sys.exit(0) sys.exit(0)
new_ctrl_mode = self.ctrl_mode # if "" then inequality will always be true new_ctrl_mode = self.ctrl_mode # if "" then inequality will always be true
# Check for control mode change # Check for control mode change
@@ -137,7 +140,6 @@ class Headless(Node):
self.ctrl_mode = new_ctrl_mode self.ctrl_mode = new_ctrl_mode
self.get_logger().info(f"Switched to {self.ctrl_mode} control mode") self.get_logger().info(f"Switched to {self.ctrl_mode} control mode")
# CORE # CORE
if self.ctrl_mode == "core" and CORE_MODE == "duty": if self.ctrl_mode == "core" and CORE_MODE == "duty":
input = CoreControl() input = CoreControl()
@@ -148,7 +150,6 @@ class Headless(Node):
right_stick_y = deadzone(self.gamepad.get_axis(4)) right_stick_y = deadzone(self.gamepad.get_axis(4))
right_trigger = deadzone(self.gamepad.get_axis(5)) right_trigger = deadzone(self.gamepad.get_axis(5))
# Right wheels # Right wheels
input.right_stick = float(round(-1 * right_stick_y, 2)) input.right_stick = float(round(-1 * right_stick_y, 2))
@@ -158,9 +159,8 @@ class Headless(Node):
else: else:
input.left_stick = float(round(-1 * left_stick_y, 2)) input.left_stick = float(round(-1 * left_stick_y, 2))
# Debug # Debug
output = f'L: {input.left_stick}, R: {input.right_stick}' output = f"L: {input.left_stick}, R: {input.right_stick}"
self.get_logger().info(f"[Ctrl] {output}") self.get_logger().info(f"[Ctrl] {output}")
self.core_publisher.publish(input) self.core_publisher.publish(input)
@@ -179,13 +179,17 @@ class Headless(Node):
# Forward/back and Turn # Forward/back and Turn
input.linear.x = -1.0 * left_stick_y input.linear.x = -1.0 * left_stick_y
input.angular.z = -1.0 * copysign(right_stick_x ** 2, right_stick_x) # Exponent for finer control (curve) input.angular.z = -1.0 * copysign(
right_stick_x**2, right_stick_x
) # Exponent for finer control (curve)
# Publish # Publish
self.core_twist_pub_.publish(input) self.core_twist_pub_.publish(input)
self.arm_publisher.publish(ARM_STOP_MSG) self.arm_publisher.publish(ARM_STOP_MSG)
# self.bio_publisher.publish(BIO_STOP_MSG) # self.bio_publisher.publish(BIO_STOP_MSG)
self.get_logger().info(f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}") self.get_logger().info(
f"[Core Ctrl] Linear: {round(input.linear.x, 2)}, Angular: {round(input.angular.z, 2)}"
)
# Brake mode # Brake mode
new_brake_mode = button_a new_brake_mode = button_a
@@ -198,15 +202,19 @@ class Headless(Node):
new_max_duty = 0.5 new_max_duty = 0.5
# Only publish if needed # Only publish if needed
if new_brake_mode != self.core_brake_mode or new_max_duty != self.core_max_duty: if (
new_brake_mode != self.core_brake_mode
or new_max_duty != self.core_max_duty
):
self.core_brake_mode = new_brake_mode self.core_brake_mode = new_brake_mode
self.core_max_duty = new_max_duty self.core_max_duty = new_max_duty
state_msg = CoreCtrlState() state_msg = CoreCtrlState()
state_msg.brake_mode = bool(self.core_brake_mode) state_msg.brake_mode = bool(self.core_brake_mode)
state_msg.max_duty = float(self.core_max_duty) state_msg.max_duty = float(self.core_max_duty)
self.core_state_pub_.publish(state_msg) self.core_state_pub_.publish(state_msg)
self.get_logger().info(f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}") self.get_logger().info(
f"[Core State] Brake: {self.core_brake_mode}, Max Duty: {self.core_max_duty}"
)
# ARM and BIO # ARM and BIO
if self.ctrl_mode == "arm": if self.ctrl_mode == "arm":
@@ -222,7 +230,6 @@ class Headless(Node):
right_bumper = self.gamepad.get_button(5) right_bumper = self.gamepad.get_button(5)
dpad_input = self.gamepad.get_hat(0) dpad_input = self.gamepad.get_hat(0)
# EF Grippers # EF Grippers
if left_trigger > 0 and right_trigger > 0: if left_trigger > 0 and right_trigger > 0:
arm_input.gripper = 0 arm_input.gripper = 0
@@ -237,7 +244,6 @@ class Headless(Node):
elif dpad_input[0] == -1: elif dpad_input[0] == -1:
arm_input.axis0 = -1 arm_input.axis0 = -1
if right_bumper: # Control end effector if right_bumper: # Control end effector
# Effector yaw # Effector yaw
@@ -255,27 +261,28 @@ class Headless(Node):
else: # Control arm axis else: # Control arm axis
# Axis 1 # Axis 1
if abs(left_stick_x) > .15: if abs(left_stick_x) > 0.15:
arm_input.axis1 = round(left_stick_x) arm_input.axis1 = round(left_stick_x)
# Axis 2 # Axis 2
if abs(left_stick_y) > .15: if abs(left_stick_y) > 0.15:
arm_input.axis2 = -1 * round(left_stick_y) arm_input.axis2 = -1 * round(left_stick_y)
# Axis 3 # Axis 3
if abs(right_stick_y) > .15: if abs(right_stick_y) > 0.15:
arm_input.axis3 = -1 * round(right_stick_y) arm_input.axis3 = -1 * round(right_stick_y)
# BIO # BIO
bio_input = BioControl( bio_input = BioControl(
bio_arm=int(left_stick_y * -100), bio_arm=int(left_stick_y * -100),
drill_arm=int(round(right_stick_y) * -100) drill_arm=int(round(right_stick_y) * -100),
) )
# Drill motor (FAERIE) # Drill motor (FAERIE)
if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0: if deadzone(left_trigger) > 0 or deadzone(right_trigger) > 0:
bio_input.drill = int(30 * (right_trigger - left_trigger)) # Max duty cycle 30% bio_input.drill = int(
30 * (right_trigger - left_trigger)
) # Max duty cycle 30%
self.core_publisher.publish(CORE_STOP_MSG) self.core_publisher.publish(CORE_STOP_MSG)
self.arm_publisher.publish(arm_input) self.arm_publisher.publish(arm_input)
@@ -295,6 +302,9 @@ def main(args=None):
rclpy.spin(node) rclpy.spin(node)
rclpy.shutdown() rclpy.shutdown()
if __name__ == '__main__':
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0)) # Catch termination signals and exit cleanly if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main() main()