2 Commits

Author SHA1 Message Date
David
fd12e6c44a feat: (core) make compatible with ros2_control 2026-02-10 16:27:04 -06:00
David
1e8925e135 feat: add ros2 control option to main launch for core 2026-02-08 17:29:04 -06:00
11 changed files with 352 additions and 292 deletions

3
.gitmodules vendored
View File

@@ -1,3 +1,6 @@
[submodule "src/astra_description"] [submodule "src/astra_description"]
path = src/astra_descriptions path = src/astra_descriptions
url = ../astra_descriptions url = ../astra_descriptions
[submodule "src/astra_msgs"]
path = src/astra_msgs
url = git@github.com:SHC-ASTRA/astra_msgs.git

View File

@@ -14,8 +14,12 @@ echo "[INFO] Network interface is up!"
# Your actual ROS node start command goes here # Your actual ROS node start command goes here
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script (if we aren't using nix) # Source ROS 2 Humble setup script
command -v ros2 || source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,7 +15,11 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..." echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script # Source ROS 2 Humble setup script
command -v ros2 || source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source your workspace setup script # Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,7 +17,11 @@ done
echo "[INFO] Network interface is up!" echo "[INFO] Network interface is up!"
command -v ros2 || source /opt/ros/humble/setup.bash if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
source $ANCHOR_WS/install/setup.bash source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash [[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

96
flake.lock generated
View File

@@ -1,29 +1,5 @@
{ {
"nodes": { "nodes": {
"astra-msgs": {
"inputs": {
"nix-ros-overlay": "nix-ros-overlay",
"nixpkgs": [
"astra-msgs",
"nix-ros-overlay",
"nixpkgs"
]
},
"locked": {
"lastModified": 1770693861,
"narHash": "sha256-8NBtSsKVYtd+NYt51tJ1EteC0nOTemDUBoRXbDoQAuA=",
"owner": "SHC-ASTRA",
"repo": "astra_msgs",
"rev": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"type": "github"
},
"original": {
"owner": "SHC-ASTRA",
"ref": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"repo": "astra_msgs",
"type": "github"
}
},
"flake-utils": { "flake-utils": {
"inputs": { "inputs": {
"systems": "systems" "systems": "systems"
@@ -42,49 +18,11 @@
"type": "github" "type": "github"
} }
}, },
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-ros-overlay": { "nix-ros-overlay": {
"inputs": { "inputs": {
"flake-utils": "flake-utils", "flake-utils": "flake-utils",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
}, },
"locked": {
"lastModified": 1770408708,
"narHash": "sha256-E7ZQRgGrsiuswgXnG7337ZR5s4SdZlheZjxKOQdVoH8=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "e78ba91032c7f8bdd823fbf43937cbf0f4f09747",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nix-ros-overlay_2": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs_2"
},
"locked": { "locked": {
"lastModified": 1770108954, "lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=", "narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
@@ -116,26 +54,9 @@
"type": "github" "type": "github"
} }
}, },
"nixpkgs_2": {
"locked": {
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "nix-ros",
"repo": "nixpkgs",
"type": "github"
}
},
"root": { "root": {
"inputs": { "inputs": {
"astra-msgs": "astra-msgs", "nix-ros-overlay": "nix-ros-overlay",
"nix-ros-overlay": "nix-ros-overlay_2",
"nixpkgs": [ "nixpkgs": [
"nix-ros-overlay", "nix-ros-overlay",
"nixpkgs" "nixpkgs"
@@ -156,21 +77,6 @@
"repo": "default", "repo": "default",
"type": "github" "type": "github"
} }
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
} }
}, },
"root": "root", "root": "root",

View File

@@ -4,8 +4,6 @@
inputs = { inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop"; nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!! nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
# specify astra_msgs commit hash to the one we support
astra-msgs.url = "github:SHC-ASTRA/astra_msgs?ref=60bbb53085b09fbdb7e848b1dd168d526d9af281";
}; };
outputs = outputs =
@@ -13,7 +11,6 @@
self, self,
nix-ros-overlay, nix-ros-overlay,
nixpkgs, nixpkgs,
astra-msgs,
}: }:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem ( nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system: system:
@@ -27,9 +24,8 @@
devShells.default = pkgs.mkShell { devShells.default = pkgs.mkShell {
name = "ASTRA Anchor"; name = "ASTRA Anchor";
packages = with pkgs; [ packages = with pkgs; [
astra-msgs.packages.${system}.astra-msgs
colcon colcon
(python313.withPackages ( (python312.withPackages (
p: with p; [ p: with p; [
pyserial pyserial
pygame pygame
@@ -43,7 +39,6 @@
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
rqt-graph
ros2cli ros2cli
ros2run ros2run
ros2bag ros2bag

View File

@@ -1,13 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction, Shutdown from launch.actions import (
DeclareLaunchArgument,
OpaqueFunction,
Shutdown,
IncludeLaunchDescription,
)
from launch.substitutions import ( from launch.substitutions import (
LaunchConfiguration, LaunchConfiguration,
ThisLaunchFileDir, ThisLaunchFileDir,
PathJoinSubstitution, PathJoinSubstitution,
) )
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.conditions import IfCondition
from launch_ros.actions import Node from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
# Prevent making __pycache__ directories # Prevent making __pycache__ directories
@@ -40,7 +48,10 @@ def launch_setup(context, *args, **kwargs):
executable="core", # change as needed executable="core", # change as needed
name="core", name="core",
output="both", output="both",
parameters=[{"launch_mode": mode}], parameters=[
{"launch_mode": mode},
{"use_ros2_control": LaunchConfiguration("use_ros2_control", default=False)},
],
on_exit=Shutdown(), on_exit=Shutdown(),
) )
) )
@@ -133,4 +144,46 @@ def generate_launch_description():
description="Launch mode: arm, core, bio, anchor, or ptz", description="Launch mode: arm, core, bio, anchor, or ptz",
) )
return LaunchDescription([declare_arg, OpaqueFunction(function=launch_setup)]) ros2_control_arg = DeclareLaunchArgument(
"use_ros2_control",
default_value="false",
description="Whether to use DiffDriveController for driving instead of direct Twist",
)
rsp = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"robot_state_publisher.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
controllers = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[
FindPackageShare("core_description"),
"launch",
"spawn_controllers.launch.py",
]
)
),
condition=IfCondition(LaunchConfiguration("use_ros2_control")),
launch_arguments={("hardware_mode", "physical")},
)
return LaunchDescription(
[
declare_arg,
ros2_control_arg,
rsp,
controllers,
OpaqueFunction(function=launch_setup),
]
)

1
src/astra_msgs Submodule

Submodule src/astra_msgs added at 2840bfef34

View File

@@ -1,203 +1,241 @@
import signal import rclpy
from rclpy.node import Node
import serial
import sys import sys
import threading import threading
import os
import glob
import time import time
import atexit
import rclpy import signal
from astra_msgs.action import BioVacuum from std_msgs.msg import String
from astra_msgs.msg import BioControl, BioDistributor, VicCAN from astra_msgs.msg import BioControl
from astra_msgs.srv import BioTestTube from astra_msgs.msg import BioFeedback
from rclpy.action import ActionServer
from rclpy.node import Node
from std_msgs.msg import Header, String
serial_pub = None serial_pub = None
thread = None thread = None
# used to verify the length of an incoming VicCAN feedback message
# key is VicCAN command_id, value is expected length of data list
viccan_citadel_msg_len_dict = {
# empty because not expecting any VicCAN from citadel atm
}
viccan_lance_msg_len_dict = {
# empty because not sure what VicCAN commands LANCE sends
}
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
# Initialize node # Initialize node
super().__init__("bio_node") super().__init__("bio_node")
# Anchor Topics # Get launch mode parameter
self.anchor_fromvic_sub_ = self.create_subscription( self.declare_parameter("launch_mode", "bio")
VicCAN, "/anchor/from_vic/bio", self.relay_fromvic, 20 self.launch_mode = self.get_parameter("launch_mode").value
) self.get_logger().info(f"bio launch_mode is: {self.launch_mode}")
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20 # Create publishers
self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10)
self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10)
# Create subscribers
self.control_sub = self.create_subscription(
BioControl, "/bio/control", self.send_control, 10
) )
self.anchor_sub = self.create_subscription( # Create a publisher for telemetry
String, "/anchor/bio/feedback", self.anchor_feedback, 10 self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.distributor_sub = self.create_subscription( # Topics used in anchor mode
BioDistributor, if self.launch_mode == "anchor":
"/bio/control/distributor", self.anchor_sub = self.create_subscription(
self.distributor_callback, String, "/anchor/bio/feedback", self.anchor_feedback, 10
10, )
) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Services self.bio_feedback = BioFeedback()
self.test_tube_service = self.create_service(
BioTestTube, "/bio/control/test_tube", self.test_tube_callback
)
# Actions # Search for ports IF in 'arm' (standalone) and not 'anchor' mode
self._action_server = ActionServer( if self.launch_mode == "bio":
self, BioVacuum, "/bio/actions/vacuum", self.execute_vacuum # Loop through all serial devices on the computer to check for the MCU
) self.port = None
for i in range(2):
try:
# connect and send a ping command
set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
)
ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = set_port
self.get_logger().info(f"Found MCU at {set_port}!")
break
except:
pass
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
time.sleep(0.1) if self.launch_mode == "bio":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_control(self, msg: BioControl): def send_control(self, msg: BioControl):
print("todo") # CITADEL Control Commands
################
# Chem Pumps, only send if not zero
if msg.pump_id != 0:
command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command)
# Fans, only send if not zero
if msg.fan_id != 0:
command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command)
# Servos, only send if not zero
if msg.servo_id != 0:
command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command)
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command)
# Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command)
# FAERIE Control Commands
################
# To be reviewed before use#
# Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
# self.send_cmd(command)
# Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command)
# Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
# send to anchor node to relay if (
output = String() self.launch_mode == "anchor"
output.data = msg ): # if in anchor mode, send to anchor node to relay
self.anchor_pub.publish(output) output = String()
output.data = msg
def relay_fromvic(self, msg: VicCAN): self.anchor_pub.publish(output)
# self.get_logger().info(msg) elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly
if msg.mcu_name == "citadel": self.get_logger().info(f"[Bio to MCU] {msg}")
self.process_fromvic_citadel(msg) self.ser.write(bytes(msg, "utf8"))
def process_fromvic_citadel(self, msg: VicCAN):
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_citadel_msg_len_dict:
expected_len = viccan_citadel_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
self.get_logger().info(f"[Bio Anchor] {msg.data}") # self.get_logger().info(f"[Bio Anchor] {msg.data}")
# no data planned to be received from citadel, not sure about lance
def distributor_callback(self, msg: BioDistributor): if output.startswith(
distributor_arr = msg.distributor_id "can_relay_fromvic,citadel,54"
vic_cmd = VicCAN( ): # bat, 12, 5, Voltage readings * 100
header=Header(stamp=self.get_clock().now().to_msg()), self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
mcu_name="citadel", self.bio_feedback.voltage_12 = float(parts[4]) / 100.0
command_id=40, self.bio_feedback.voltage_5 = float(parts[5]) / 100.0
data=[ elif output.startswith("can_relay_fromvic,digit,57"):
clamp_short(distributor_arr[0]), self.bio_feedback.drill_temp = float(parts[3])
clamp_short(distributor_arr[1]), self.bio_feedback.drill_humidity = float(parts[4])
clamp_short(distributor_arr[2]),
0,
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
def test_tube_callback(self, request, response): def publish_feedback(self):
tubes_cmd = VicCAN( self.feedback_pub.publish(self.bio_feedback)
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[
float(int.from_bytes(request.tube_id)),
float(request.milliliters),
],
)
self.anchor_tovic_pub_.publish(tubes_cmd)
return response
def execute_vacuum(self, goal_handle): @staticmethod
valve_id = int.from_bytes(goal_handle.request.valve_id) def list_serial_ports():
duty = int.from_bytes(goal_handle.request.fan_duty_cycle_percent) return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
total = goal_handle.request.fan_time_ms # return glob.glob("/dev/tty[A-Za-z]*")
# open valve def cleanup(self):
self.anchor_tovic_pub_.publish( print("Cleaning up...")
VicCAN( try:
header=Header(stamp=self.get_clock().now().to_msg()), if self.ser.is_open:
mcu_name="citadel", self.ser.close()
command_id=40, except Exception as e:
data=[float(valve_id)], exit(0)
)
)
# set fan duty cycle
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[float(duty)],
)
)
feedback = BioVacuum.Feedback()
start = time.time()
while True:
elapsed = int((time.time() - start) * 1000)
remaining = max(0, total - elapsed)
feedback.fan_time_remaining_ms = remaining
goal_handle.publish_feedback(feedback)
if remaining == 0:
break
time.sleep(0.1)
# stop fan
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[0.0],
)
)
# close valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[-1.0],
)
)
goal_handle.succeed()
return BioVacuum.Result()
def clamp_short(x: int) -> int:
return max(-32768, min(32767, x))
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):

View File

@@ -66,6 +66,10 @@ class SerialRelay(Node):
self.launch_mode = self.get_parameter("launch_mode").value self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"Core launch_mode is: {self.launch_mode}") self.get_logger().info(f"Core launch_mode is: {self.launch_mode}")
self.declare_parameter("use_ros2_control", False)
self.use_ros2_control = self.get_parameter("use_ros2_control").value
self.get_logger().info(f"Use ros2_control: {self.use_ros2_control}")
################################################## ##################################################
# Topics # Topics
@@ -85,24 +89,28 @@ class SerialRelay(Node):
# Control # Control
# autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2 if self.use_ros2_control:
self.cmd_vel_sub_ = self.create_subscription( # Joint state control for topic-based controller
TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1 self.joint_command_sub_ = self.create_subscription(
) JointState, "/core/joint_commands", self.joint_command_callback, 2
# manual twist -- [-1, 1] rather than real units )
self.twist_man_sub_ = self.create_subscription( else:
Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos # autonomy twist -- m/s and rad/s -- for autonomy, in particular Nav2
) self.cmd_vel_sub_ = self.create_subscription(
# manual flags -- brake mode and max duty cycle TwistStamped, "/cmd_vel", self.cmd_vel_callback, 1
self.control_state_sub_ = self.create_subscription( )
CoreCtrlState, # manual twist -- [-1, 1] rather than real units
"/core/control/state", self.twist_man_sub_ = self.create_subscription(
self.control_state_callback, Twist, "/core/twist", self.twist_man_callback, qos_profile=control_qos
qos_profile=control_qos, )
) # manual flags -- brake mode and max duty cycle
self.twist_max_duty = ( self.control_state_sub_ = self.create_subscription(
0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5 CoreCtrlState,
) "/core/control/state",
self.control_state_callback,
qos_profile=control_qos,
)
self.twist_max_duty = 0.5 # max duty cycle for twist commands (0.0 - 1.0); walking speed is 0.5
# Feedback # Feedback
@@ -122,7 +130,7 @@ class SerialRelay(Node):
) # TODO: not sure about this ) # TODO: not sure about this
# Joint states for topic-based controller # Joint states for topic-based controller
self.joint_state_pub_ = self.create_publisher( self.joint_state_pub_ = self.create_publisher(
JointState, "/core/joint_states", qos_profile=qos.qos_profile_sensor_data JointState, "/joint_states", qos_profile=qos.qos_profile_sensor_data
) )
# IMU (embedded BNO-055) # IMU (embedded BNO-055)
self.imu_pub_ = self.create_publisher( self.imu_pub_ = self.create_publisher(
@@ -148,10 +156,11 @@ class SerialRelay(Node):
# Old # Old
# /core/control if not self.use_ros2_control:
self.control_sub = self.create_subscription( # /core/control
CoreControl, "/core/control", self.send_controls, 10 self.control_sub = self.create_subscription(
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff CoreControl, "/core/control", self.send_controls, 10
) # old control method -- left_stick, right_stick, max_speed, brake, and some other random autonomy stuff
# /core/feedback # /core/feedback
self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10) self.feedback_pub = self.create_publisher(CoreFeedback, "/core/feedback", 10)
self.core_feedback = CoreFeedback() self.core_feedback = CoreFeedback()
@@ -283,6 +292,40 @@ class SerialRelay(Node):
# print(f"[Sys] Relaying: {command}") # print(f"[Sys] Relaying: {command}")
def joint_command_callback(self, msg: JointState):
# So... topic based control node publishes JointState messages over /joint_commands
# with len(msg.name) == 5 and len(msg.velocity) == 4... all 5 non-fixed joints
# are included in msg.name, but ig it is implied that msg.velocity only
# includes velocities for the commanded joints (ros__parameters.joints).
# So, this will be much more hacky and less adaptable than I would like it to be.
if len(msg.name) != 5 or len(msg.velocity) != 4 or len(msg.position) != 0:
self.get_logger().warning(
f"Received joint control message with unexpected number of joints. Ignoring."
)
return
if msg.name != [
"left_suspension_joint",
"bl_wheel_joint",
"br_wheel_joint",
"fl_wheel_joint",
"fr_wheel_joint",
]:
self.get_logger().warning(
f"Received joint control message with unexpected name[]. Ignoring."
)
return
(bl_vel, br_vel, fl_vel, fr_vel) = msg.velocity
bl_rpm = radps_to_rpm(bl_vel) * CORE_GEAR_RATIO
br_rpm = radps_to_rpm(br_vel) * CORE_GEAR_RATIO
fl_rpm = radps_to_rpm(fl_vel) * CORE_GEAR_RATIO
fr_rpm = radps_to_rpm(fr_vel) * CORE_GEAR_RATIO
self.send_viccan(
20, [fl_rpm, bl_rpm, fr_rpm, br_rpm]
) # order expected by embedded
def cmd_vel_callback(self, msg: TwistStamped): def cmd_vel_callback(self, msg: TwistStamped):
linear = msg.twist.linear.x linear = msg.twist.linear.x
angular = -msg.twist.angular.z angular = -msg.twist.angular.z
@@ -528,22 +571,27 @@ class SerialRelay(Node):
match motorId: match motorId:
case 1: case 1:
motor = self.feedback_new_state.fl_motor motor = self.feedback_new_state.fl_motor
joint_state_msg.name = ["fl_motor_joint"] joint_state_msg.name = ["fl_wheel_joint"]
case 2: case 2:
motor = self.feedback_new_state.bl_motor motor = self.feedback_new_state.bl_motor
joint_state_msg.name = ["bl_motor_joint"] joint_state_msg.name = ["bl_wheel_joint"]
case 3: case 3:
motor = self.feedback_new_state.fr_motor motor = self.feedback_new_state.fr_motor
joint_state_msg.name = ["fr_motor_joint"] joint_state_msg.name = ["fr_wheel_joint"]
case 4: case 4:
motor = self.feedback_new_state.br_motor motor = self.feedback_new_state.br_motor
joint_state_msg.name = ["br_motor_joint"] joint_state_msg.name = ["br_wheel_joint"]
case _: case _:
self.get_logger().warning( self.get_logger().warning(
f"Ignoring REV motor feedback 58 with invalid motorId {motorId}" f"Ignoring REV motor feedback 58 with invalid motorId {motorId}"
) )
return return
# make the fucking shit work
joint_state_msg.name.append("left_suspension_joint")
joint_state_msg.position.append(0.0)
joint_state_msg.velocity.append(0.0)
joint_state_msg.header.stamp = msg.header.stamp joint_state_msg.header.stamp = msg.header.stamp
self.joint_state_pub_.publish(joint_state_msg) self.joint_state_pub_.publish(joint_state_msg)
case _: case _:
@@ -581,6 +629,10 @@ def map_range(
return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min return (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def radps_to_rpm(radps: float):
return radps * 60 / (2 * pi)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
sys.excepthook = myexcepthook sys.excepthook = myexcepthook