15 Commits

Author SHA1 Message Date
SHC-ASTRA
7e6a53b7f1 fixed astra_msgs nixpkg being used incorrectly 2026-02-09 21:31:35 -06:00
SHC-ASTRA
9510270272 Merge branch 'main' into bio-topic-refactor-rebase 2026-02-09 21:17:38 -06:00
Riley M.
3f68052144 Merge pull request #30 from SHC-ASTRA/update-python
-> python 3.13
2026-02-09 21:49:08 -05:00
ryleu
c72f72dc32 -> python 3.13 2026-02-08 17:23:53 -06:00
iggy
d4042f8744 clean up 2026-02-07 19:50:51 -06:00
Riley M.
61f5f1fc3e Merge pull request #29 from SHC-ASTRA/qos-disable
Qos disable
2026-02-07 18:20:04 -06:00
SHC-ASTRA
d46bb81ed1 Merge branch 'main' into bio-topic-refactor-rebase 2026-02-05 21:34:51 -06:00
SHC-ASTRA
87a764e616 change astra_msgs to latest main hash 2026-02-05 21:07:56 -06:00
iggy
f99b9e6f96 created bio node, added rqt-graph to flake 2026-02-05 02:12:21 -06:00
David Sharpe
65aab7f179 Fix nix cache (#27, fix-cache)
Fix cache
2026-02-04 02:34:16 -06:00
ryleu
697efa7b9d add missing packages for moveit 2026-02-04 00:31:36 -05:00
ryleu
b70a0d27c3 uncomment ros2_controllers 2026-02-04 00:04:07 -05:00
ryleu
2d48361b8f update to develop branch of nix-ros-overlay 2026-02-03 23:32:42 -05:00
David
d9355f16e9 fix: EF gripper works again ._. 2026-01-31 18:32:39 -06:00
David
9fc120b09e fix: make QoS compatible with basestation-game 2026-01-31 17:21:52 -06:00
15 changed files with 450 additions and 354 deletions

3
.gitmodules vendored
View File

@@ -1,6 +1,3 @@
[submodule "src/astra_description"]
path = src/astra_descriptions
url = ../astra_descriptions
[submodule "src/astra_msgs"]
path = src/astra_msgs
url = git@github.com:SHC-ASTRA/astra_msgs.git

View File

@@ -65,10 +65,10 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
```
When you go to run anchor, use the `port_override` launch parameter to point it to the fake serial port, like so:
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so:
```bash
$ ros2 launch anchor_pkg rover.launch.py port_override:=/tmp/ttyACM9
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py
```
### Connecting the GuliKit Controller
@@ -121,24 +121,23 @@ A: Don't worry about it. If you had the workspace sourced, ROS2 will complain ab
...
```
A: To find a microcontroller to talk to, Anchor filters through your available serial ports to find microcontrollers. If the microcontroller fails
to respond properly, or one is not found, it will abort. There are a few possible fixes:
A: To find a microcontroller to talk to, Anchor sends a ping to every Serial port on your computer. If it does not receive a 'pong' in less than one second, then it aborts. There are a few possible fixes:
- Keep trying to run it until it works
- Run `lsusb` to see if the microcontroller is detected by your computer.
- Run `ls /dev/tty*0` to see if there is a valid Serial port enumerated for the microcontroller.
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`) by using the `groups` command.
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`).
## Packages
- [anchor\_pkg](./src/anchor_pkg) - Handles Serial communication between the various other packages here and the microcontroller.
- [arm\_pkg](./src/arm_pkg) - Relays controls and sensor data for the arm (socket and digit) between anchor and basestation/headless.
- [astra\_descriptions](./src/astra_descriptions) - Submodule with URDF-related packages.
- [astra\_msgs](./src/astra_msgs) - Contains custom message types for communication between basestation and the rover over ROS2.
- [bio\_pkg](./src/bio_pkg) - Like arm_pkg, but for CITADEL and FAERIE
- [core\_pkg](./src/core_pkg) - Like arm_pkg, but for Core
- [headless\_pkg](./src/headless_pkg) - Simple, non-graphical controller node to work in place of basestation when controlling the rover by itself. This is autostarted with anchor to allow for setup-less control of the rover.
- [latency\_tester](./src/latency_tester) - A temporary node to test comms latency over ROS2, Serial, and CAN.
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
## Maintainers

View File

@@ -14,12 +14,8 @@ echo "[INFO] Network interface is up!"
# Your actual ROS node start command goes here
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
# Source ROS 2 Humble setup script (if we aren't using nix)
command -v ros2 || source /opt/ros/humble/setup.bash
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
echo "[INFO] Starting ROS node..."
# Source ROS 2 Humble setup script
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
command -v ros2 || source /opt/ros/humble/setup.bash
# Source your workspace setup script
source $SCRIPT_DIR/../install/setup.bash

View File

@@ -17,11 +17,7 @@ done
echo "[INFO] Network interface is up!"
if command -v nixos-rebuild; then
echo "[INFO] running on NixOS"
else
source /opt/ros/humble/setup.bash
fi
command -v ros2 || source /opt/ros/humble/setup.bash
source $ANCHOR_WS/install/setup.bash
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash

112
flake.lock generated
View File

@@ -1,5 +1,29 @@
{
"nodes": {
"astra-msgs": {
"inputs": {
"nix-ros-overlay": "nix-ros-overlay",
"nixpkgs": [
"astra-msgs",
"nix-ros-overlay",
"nixpkgs"
]
},
"locked": {
"lastModified": 1770693861,
"narHash": "sha256-8NBtSsKVYtd+NYt51tJ1EteC0nOTemDUBoRXbDoQAuA=",
"owner": "SHC-ASTRA",
"repo": "astra_msgs",
"rev": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"type": "github"
},
"original": {
"owner": "SHC-ASTRA",
"ref": "60bbb53085b09fbdb7e848b1dd168d526d9af281",
"repo": "astra_msgs",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
@@ -18,33 +42,87 @@
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-ros-overlay": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1761810010,
"narHash": "sha256-o0wJKW603SiOO373BTgeZaF6nDxegMA/cRrzSC2Cscg=",
"lastModified": 1770408708,
"narHash": "sha256-E7ZQRgGrsiuswgXnG7337ZR5s4SdZlheZjxKOQdVoH8=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "e277df39e3bc6b372a5138c0bcf10198857c55ab",
"rev": "e78ba91032c7f8bdd823fbf43937cbf0f4f09747",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "master",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nix-ros-overlay_2": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1770108954,
"narHash": "sha256-VBj6bd4LPPSfsZJPa/UPPA92dOs6tmQo0XZKqfz/3W4=",
"owner": "lopsided98",
"repo": "nix-ros-overlay",
"rev": "3d05d46451b376e128a1553e78b8870c75d7753a",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "develop",
"repo": "nix-ros-overlay",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1744849697,
"narHash": "sha256-S9hqvanPSeRu6R4cw0OhvH1rJ+4/s9xIban9C4ocM/0=",
"owner": "lopsided98",
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "6318f538166fef9f5118d8d78b9b43a04bb049e4",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {
"owner": "lopsided98",
"ref": "nix-ros",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1759381078,
"narHash": "sha256-gTrEEp5gEspIcCOx9PD8kMaF1iEmfBcTbO0Jag2QhQs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7df7ff7d8e00218376575f0acdcc5d66741351ee",
"type": "github"
},
"original": {
@@ -56,7 +134,8 @@
},
"root": {
"inputs": {
"nix-ros-overlay": "nix-ros-overlay",
"astra-msgs": "astra-msgs",
"nix-ros-overlay": "nix-ros-overlay_2",
"nixpkgs": [
"nix-ros-overlay",
"nixpkgs"
@@ -77,6 +156,21 @@
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",

View File

@@ -2,8 +2,10 @@
description = "Development environment for ASTRA Anchor";
inputs = {
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/master";
nix-ros-overlay.url = "github:lopsided98/nix-ros-overlay/develop";
nixpkgs.follows = "nix-ros-overlay/nixpkgs"; # IMPORTANT!!!
# specify astra_msgs commit hash to the one we support
astra-msgs.url = "github:SHC-ASTRA/astra_msgs?ref=60bbb53085b09fbdb7e848b1dd168d526d9af281";
};
outputs =
@@ -11,6 +13,7 @@
self,
nix-ros-overlay,
nixpkgs,
astra-msgs,
}:
nix-ros-overlay.inputs.flake-utils.lib.eachDefaultSystem (
system:
@@ -24,8 +27,9 @@
devShells.default = pkgs.mkShell {
name = "ASTRA Anchor";
packages = with pkgs; [
astra-msgs.packages.${system}.astra-msgs
colcon
(python312.withPackages (
(python313.withPackages (
p: with p; [
pyserial
pygame
@@ -39,6 +43,7 @@
buildEnv {
paths = [
ros-core
rqt-graph
ros2cli
ros2run
ros2bag
@@ -56,10 +61,12 @@
control-msgs
control-toolbox
moveit-core
moveit-planners
moveit-common
moveit-msgs
moveit-ros-planning
moveit-ros-planning-interface
moveit-ros-visualization
moveit-configs-utils
moveit-ros-move-group
moveit-servo
@@ -68,9 +75,9 @@
pilz-industrial-motion-planner
pick-ik
ompl
chomp-motion-planner
joy
# ros2-controllers nixpkg does not build :(
ros2-controllers
chomp-motion-planner
];
}
)

View File

@@ -1,17 +1,20 @@
import atexit
import glob
import signal
import threading
from typing import cast
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_srvs.srv import Empty
import signal
import time
import atexit
import serial
import serial.tools.list_ports
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import Header, String
import os
import sys
import threading
import glob
from std_msgs.msg import String, Header
from astra_msgs.msg import VicCAN
KNOWN_USBS = [
@@ -43,23 +46,15 @@ class Anchor(Node):
- Publish raw strings to this topic to send directly to the MCU for debugging
"""
serial_port: str | None = None # e.g., "/dev/ttyUSB0"
def __init__(self):
# Initalize node with name
super().__init__("anchor_node")
super().__init__("anchor_node") # previously 'serial_publisher'
self.declare_parameter(
"port_override",
"",
ParameterDescriptor(
name="port_override", type=ParameterType.PARAMETER_STRING
),
)
self.serial_port: str | None = None # e.g., "/dev/ttyUSB0"
# Serial port override
if port_override := self.get_parameter("port_override").value:
self.serial_port = cast(str, port_override) # Cast to make the linter happy
if port_override := os.getenv("PORT_OVERRIDE"):
self.serial_port = port_override
##################################################
# Serial MCU Discovery
@@ -67,64 +62,104 @@ class Anchor(Node):
# If there was not a port override, look for a MCU over USB for Serial.
if self.serial_port is None:
comports = serial.tools.list_ports.comports()
recog_ports = list(
real_ports = list(
filter(
# Filter for ports we know that aren't invalid
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
lambda p: p.vid is not None
and p.pid is not None
and p.device is not None,
comports,
)
)
recog_ports = list(filter(lambda p: (p.vid, p.pid) in KNOWN_USBS, comports))
# Guards
if len(recog_ports) > 1: # If we found too many
self.get_logger().fatal(
if len(recog_ports) == 1: # Found singular recognized MCU
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
self.serial_port = found_port.device # String, location of device file; e.g., '/dev/ttyACM0'
elif len(recog_ports) > 1: # Found multiple recognized MCUs
# Kinda jank log message
self.get_logger().error(
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
)
exit(1)
if len(recog_ports) == 0:
self.get_logger().fatal(
f"Found no recognized MCUs: {[p.device for p in recog_ports].__str__()}"
# Don't set self.serial_port; later if-statement will exit()
elif (
len(recog_ports) == 0 and len(real_ports) > 0
): # Found real ports but none recognized; i.e. maybe found an IMU or camera but not a MCU
self.get_logger().error(
f"No recognized MCUs found; instead found {[p.device for p in real_ports].__str__()}."
)
exit(1)
# Don't set self.serial_port; later if-statement will exit()
else: # Found jack shit
self.get_logger().error("No valid Serial ports specified or found.")
# Don't set self.serial_port; later if-statement will exit()
# Everything is ok
found_port = recog_ports[0]
self.get_logger().info(
f"Selecting MCU '{found_port.description}' at {found_port.device}."
)
# String, location of device file; e.g., '/dev/ttyACM0'
self.serial_port = found_port.device
# Found a Serial port, try to open it; above code has not officially opened a Serial port
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
try:
self.serial_interface = serial.Serial(self.serial_port, 115200, timeout=1)
# Attempt to get name of connected MCU
self.serial_interface.write(b"can_relay_mode,on\n") # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4): # Sometimes it takes a sec
response = self.serial_interface.read_until(bytes("\n", "utf8"))
if b"can_relay_ready" in response:
try:
args: list[str] = response.decode("utf8").strip().split(",")
except UnicodeDecodeError:
continue # ignore malformed responses
if len(args) == 2:
mcu_name = args[1]
# We still don't have a serial port; fall back to legacy discovery (Areeb's code)
# Loop through all serial devices on the computer to check for the MCU
if self.serial_port is None:
self.get_logger().warning("Falling back to legacy MCU discovery...")
ports = Anchor.list_serial_ports()
for _ in range(4):
if self.serial_port is not None:
break
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
for port in ports:
try:
# connect and send a ping command
ser = serial.Serial(port, 115200, timeout=1)
# (f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until(bytes("\n", "utf8"))
except serial.SerialException as e:
self.get_logger().fatal(
f"Could not open Serial port '{self.serial_port}' for reason:"
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.serial_port = port
self.get_logger().info(f"Found MCU at {self.serial_port}!")
break
except:
pass
# If port is still None then we ain't finding no mcu
if self.serial_port is None:
self.get_logger().error("Unable to find MCU. Exiting...")
time.sleep(1)
sys.exit(1)
# Found a Serial port, try to open it; above code has not officially opened a Serial port
else:
self.get_logger().debug(
f"Attempting to open Serial port '{self.serial_port}'..."
)
self.get_logger().fatal(e.strerror)
exit(1)
try:
self.serial_interface = serial.Serial(
self.serial_port, 115200, timeout=1
)
# Attempt to get name of connected MCU
self.serial_interface.write(
b"can_relay_mode,on\n"
) # can_relay_ready,[mcu]
mcu_name: str = ""
for _ in range(4):
response = self.serial_interface.read_until(bytes("\n", "utf8"))
try:
if b"can_relay_ready" in response:
args: list[str] = response.decode("utf8").strip().split(",")
if len(args) == 2:
mcu_name = args[1]
break
except UnicodeDecodeError:
pass # ignore malformed responses
self.get_logger().info(
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
)
except serial.SerialException as e:
self.get_logger().error(
f"Could not open Serial port '{self.serial_port}' for reason:"
)
self.get_logger().error(e.strerror)
time.sleep(1)
sys.exit(1)
# Close serial port on exit
atexit.register(self.cleanup)
@@ -170,38 +205,54 @@ class Anchor(Node):
def read_MCU(self):
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
output: str | None = None
try:
output = str(self.serial_interface.readline(), "utf8")
if output:
self.relay_fromvic(output)
# All output over debug temporarily
# self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
# msg = String()
# msg.data = output
# self.debug_pub.publish(msg)
return
except serial.SerialException as e:
self.get_logger().fatal(f"SerialException: {e}")
print(f"SerialException: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except TypeError as e:
self.get_logger().fatal(f"TypeError: {e}")
print(f"TypeError: {e}")
print("Closing serial port.")
try:
if self.serial_interface.is_open:
self.serial_interface.close()
except:
pass
exit(1)
except Exception as e:
self.get_logger().error(f"Exception: {e}")
if output:
self.relay_fromvic(output)
msg = String()
msg.data = output
# All output over debug
self.debug_pub.publish(msg)
# Send the message to the right place
if output.startswith("can_relay_fromvic,core"):
self.core_pub.publish(msg)
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
"can_relay_fromvic,digit"
): # digit for voltage readings
self.arm_pub.publish(msg)
if output.startswith("can_relay_fromvic,citadel") or output.startswith(
"can_relay_fromvic,digit"
): # digit for SHT sensor
self.bio_pub.publish(msg)
return
print(f"Exception: {e}")
# print("Closing serial port.")
# if self.ser.is_open:
# self.ser.close()
# exit(1)
def on_mock_fromvic(self, msg: String):
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
@@ -309,6 +360,6 @@ def main(args=None):
if __name__ == "__main__":
signal.signal(
signal.SIGTERM, lambda signum, frame: exit(0)
signal.SIGTERM, lambda signum, frame: sys.exit(0)
) # Catch termination signals and exit cleanly
main()

View File

@@ -49,7 +49,7 @@ class SerialRelay(Node):
# Create subscribers
self.man_sub = self.create_subscription(
ArmManual, "/arm/control/manual", self.send_manual, 10
ArmManual, "/arm/control/manual", self.send_manual, 2
)
# New messages
@@ -190,7 +190,7 @@ class SerialRelay(Node):
command += f"can_relay_tovic,digit,39,{msg.effector_yaw},{msg.effector_roll}\n"
# command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,26,{msg.gripper}\n" # no hardware rn
command += f"can_relay_tovic,digit,28,{msg.laser}\n"

Submodule src/astra_msgs deleted from 6a57072723

View File

@@ -1,241 +1,203 @@
import rclpy
from rclpy.node import Node
import serial
import signal
import sys
import threading
import os
import glob
import time
import atexit
import signal
from std_msgs.msg import String
from astra_msgs.msg import BioControl
from astra_msgs.msg import BioFeedback
import rclpy
from astra_msgs.action import BioVacuum
from astra_msgs.msg import BioControl, BioDistributor, VicCAN
from astra_msgs.srv import BioTestTube
from rclpy.action import ActionServer
from rclpy.node import Node
from std_msgs.msg import Header, String
serial_pub = None
thread = None
# used to verify the length of an incoming VicCAN feedback message
# key is VicCAN command_id, value is expected length of data list
viccan_citadel_msg_len_dict = {
# empty because not expecting any VicCAN from citadel atm
}
viccan_lance_msg_len_dict = {
# empty because not sure what VicCAN commands LANCE sends
}
class SerialRelay(Node):
def __init__(self):
# Initialize node
super().__init__("bio_node")
# Get launch mode parameter
self.declare_parameter("launch_mode", "bio")
self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"bio launch_mode is: {self.launch_mode}")
# Create publishers
self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10)
self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10)
# Create subscribers
self.control_sub = self.create_subscription(
BioControl, "/bio/control", self.send_control, 10
# Anchor Topics
self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/bio", self.relay_fromvic, 20
)
self.anchor_tovic_pub_ = self.create_publisher(
VicCAN, "/anchor/to_vic/relay", 20
)
# Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
self.anchor_sub = self.create_subscription(
String, "/anchor/bio/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription(
String, "/anchor/bio/feedback", self.anchor_feedback, 10
)
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.distributor_sub = self.create_subscription(
BioDistributor,
"/bio/control/distributor",
self.distributor_callback,
10,
)
self.bio_feedback = BioFeedback()
# Services
self.test_tube_service = self.create_service(
BioTestTube, "/bio/control/test_tube", self.test_tube_callback
)
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode
if self.launch_mode == "bio":
# Loop through all serial devices on the computer to check for the MCU
self.port = None
for i in range(2):
try:
# connect and send a ping command
set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
)
ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU
if b"pong" in response:
self.port = set_port
self.get_logger().info(f"Found MCU at {set_port}!")
break
except:
pass
if self.port is None:
self.get_logger().info(
"Unable to find MCU... please make sure it is connected."
)
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200)
atexit.register(self.cleanup)
# Actions
self._action_server = ActionServer(
self, BioVacuum, "/bio/actions/vacuum", self.execute_vacuum
)
def run(self):
global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start()
# if in arm mode, will need to read from the MCU
try:
while rclpy.ok():
if self.launch_mode == "bio":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1)
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_control(self, msg: BioControl):
# CITADEL Control Commands
################
# Chem Pumps, only send if not zero
if msg.pump_id != 0:
command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command)
# Fans, only send if not zero
if msg.fan_id != 0:
command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command)
# Servos, only send if not zero
if msg.servo_id != 0:
command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command)
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command)
# Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command)
# FAERIE Control Commands
################
# To be reviewed before use#
# Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
# self.send_cmd(command)
# Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command)
# Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
print("todo")
def send_cmd(self, msg: str):
if (
self.launch_mode == "anchor"
): # if in anchor mode, send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Bio to MCU] {msg}")
self.ser.write(bytes(msg, "utf8"))
# send to anchor node to relay
output = String()
output.data = msg
self.anchor_pub.publish(output)
def relay_fromvic(self, msg: VicCAN):
# self.get_logger().info(msg)
if msg.mcu_name == "citadel":
self.process_fromvic_citadel(msg)
def process_fromvic_citadel(self, msg: VicCAN):
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_citadel_msg_len_dict:
expected_len = viccan_citadel_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
def anchor_feedback(self, msg: String):
output = msg.data
parts = str(output.strip()).split(",")
# self.get_logger().info(f"[Bio Anchor] {msg.data}")
self.get_logger().info(f"[Bio Anchor] {msg.data}")
# no data planned to be received from citadel, not sure about lance
if output.startswith(
"can_relay_fromvic,citadel,54"
): # bat, 12, 5, Voltage readings * 100
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0
elif output.startswith("can_relay_fromvic,digit,57"):
self.bio_feedback.drill_temp = float(parts[3])
self.bio_feedback.drill_humidity = float(parts[4])
def distributor_callback(self, msg: BioDistributor):
distributor_arr = msg.distributor_id
vic_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[
clamp_short(distributor_arr[0]),
clamp_short(distributor_arr[1]),
clamp_short(distributor_arr[2]),
0,
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
def publish_feedback(self):
self.feedback_pub.publish(self.bio_feedback)
def test_tube_callback(self, request, response):
tubes_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[
float(int.from_bytes(request.tube_id)),
float(request.milliliters),
],
)
self.anchor_tovic_pub_.publish(tubes_cmd)
return response
@staticmethod
def list_serial_ports():
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
# return glob.glob("/dev/tty[A-Za-z]*")
def execute_vacuum(self, goal_handle):
valve_id = int.from_bytes(goal_handle.request.valve_id)
duty = int.from_bytes(goal_handle.request.fan_duty_cycle_percent)
total = goal_handle.request.fan_time_ms
def cleanup(self):
print("Cleaning up...")
try:
if self.ser.is_open:
self.ser.close()
except Exception as e:
exit(0)
# open valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[float(valve_id)],
)
)
# set fan duty cycle
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[float(duty)],
)
)
feedback = BioVacuum.Feedback()
start = time.time()
while True:
elapsed = int((time.time() - start) * 1000)
remaining = max(0, total - elapsed)
feedback.fan_time_remaining_ms = remaining
goal_handle.publish_feedback(feedback)
if remaining == 0:
break
time.sleep(0.1)
# stop fan
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[0.0],
)
)
# close valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[-1.0],
)
)
goal_handle.succeed()
return BioVacuum.Result()
def clamp_short(x: int) -> int:
return max(-32768, min(32767, x))
def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None):

View File

@@ -31,14 +31,14 @@ CORE_WHEEL_RADIUS = 0.171 # meters
CORE_GEAR_RATIO = 100.0 # Clucky: 100:1, Testbed: 64:1
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
# history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
# Used to verify the length of an incoming VicCAN feedback message

View File

@@ -1,25 +1,24 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import asyncio
from concurrent.futures import ThreadPoolExecutor
import signal
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from astra_msgs.msg import PtzControl, PtzFeedback
# Import the SIYI SDK
from core_pkg.siyi_sdk import (
AttitudeData,
CommandID,
DataStreamFrequency,
DataStreamType,
SingleAxis,
SiyiGimbalCamera,
CommandID,
DataStreamType,
DataStreamFrequency,
SingleAxis,
AttitudeData,
)
@@ -263,7 +262,7 @@ class PtzNode(Node):
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
)
self.debug_pub.publish(msg)
self.get_logger().debug(message_text)
self.get_logger().info(message_text)
def run_async_func(self, coro):
"""Run an async function in the event loop."""

View File

@@ -35,14 +35,14 @@ ARM_STOP_MSG = ArmManual() # "
BIO_STOP_MSG = BioControl() # "
control_qos = qos.QoSProfile(
history=qos.QoSHistoryPolicy.KEEP_LAST,
# history=qos.QoSHistoryPolicy.KEEP_LAST,
depth=2,
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.VOLATILE,
deadline=Duration(seconds=1),
lifespan=Duration(nanoseconds=500_000_000), # 500ms
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
liveliness_lease_duration=Duration(seconds=5),
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
# durability=qos.QoSDurabilityPolicy.VOLATILE,
# deadline=Duration(seconds=1),
# lifespan=Duration(nanoseconds=500_000_000), # 500ms
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
# liveliness_lease_duration=Duration(seconds=5),
)
CORE_MODE = "twist" # "twist" or "duty"