created bio node, added rqt-graph to flake

This commit is contained in:
iggy
2026-02-05 02:12:21 -06:00
parent 4a49069c2a
commit f99b9e6f96
3 changed files with 157 additions and 181 deletions

View File

@@ -39,6 +39,7 @@
buildEnv { buildEnv {
paths = [ paths = [
ros-core ros-core
rqt-graph
ros2cli ros2cli
ros2run ros2run
ros2bag ros2bag

View File

@@ -1,20 +1,29 @@
import rclpy import signal
from rclpy.node import Node
import serial
import sys import sys
import threading import threading
import os
import glob
import time import time
import atexit
import signal import rclpy
from std_msgs.msg import String from rclpy.action import ActionServer
from astra_msgs.msg import BioControl from rclpy.node import Node
from astra_msgs.msg import BioFeedback from std_msgs.msg import Header, String
from astra_msgs.action import BioVacuum
from astra_msgs.msg import BioControl, BioDistributor, BioFeedback, VicCAN
from astra_msgs.srv import BioTestTube
serial_pub = None serial_pub = None
thread = None thread = None
# used to verify the length of an incoming VicCAN feedback message
# key is VicCAN command_id, value is expected length of data list
viccan_citadel_msg_len_dict = {
# empty because not expecting any VicCAN from citadel atm
}
viccan_lance_msg_len_dict = {
# empty because not sure what VicCAN commands LANCE sends
}
class SerialRelay(Node): class SerialRelay(Node):
def __init__(self): def __init__(self):
@@ -22,172 +31,55 @@ class SerialRelay(Node):
super().__init__("bio_node") super().__init__("bio_node")
# Get launch mode parameter # Get launch mode parameter
self.declare_parameter("launch_mode", "bio") self.declare_parameter("launch_mode")
self.launch_mode = self.get_parameter("launch_mode").value self.launch_mode = self.get_parameter("launch_mode").value
self.get_logger().info(f"bio launch_mode is: {self.launch_mode}") self.get_logger().info(f"bio launch_mode is: {self.launch_mode}")
# Create publishers # Anchor Topics
self.debug_pub = self.create_publisher(String, "/bio/feedback/debug", 10) if self.launch_mode == "anchor":
self.feedback_pub = self.create_publisher(BioFeedback, "/bio/feedback", 10) self.anchor_fromvic_sub_ = self.create_subscription(
VicCAN, "/anchor/from_vic/bio", self.relay_fromvic, 20
# Create subscribers )
self.control_sub = self.create_subscription( self.anchor_tovic_pub_ = self.create_publisher(
BioControl, "/bio/control", self.send_control, 10 VicCAN, "/anchor/to_vic/relay", 20
) )
# Create a publisher for telemetry
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
# Topics used in anchor mode
if self.launch_mode == "anchor":
self.anchor_sub = self.create_subscription( self.anchor_sub = self.create_subscription(
String, "/anchor/bio/feedback", self.anchor_feedback, 10 String, "/anchor/bio/feedback", self.anchor_feedback, 10
) )
self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10) self.anchor_pub = self.create_publisher(String, "/anchor/relay", 10)
self.bio_feedback = BioFeedback() self.distributor_sub = self.create_subscription(
BioDistributor,
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode "/bio/control/distributor",
if self.launch_mode == "bio": self.distributor_callback,
# Loop through all serial devices on the computer to check for the MCU 10,
self.port = None
for i in range(2):
try:
# connect and send a ping command
set_port = (
"/dev/ttyACM0" # MCU is controlled through GPIO pins on the PI
) )
ser = serial.Serial(set_port, 115200, timeout=1)
# print(f"Checking port {port}...")
ser.write(b"ping\n")
response = ser.read_until("\n")
# if pong is in response, then we are talking with the MCU # Services
if b"pong" in response:
self.port = set_port
self.get_logger().info(f"Found MCU at {set_port}!")
break
except:
pass
if self.port is None: self.test_tube_service = self.create_service(
self.get_logger().info( BioTestTube, "/bio/control/test_tube", self.test_tube_callback
"Unable to find MCU... please make sure it is connected."
) )
time.sleep(1)
sys.exit(1)
self.ser = serial.Serial(self.port, 115200) # Actions
atexit.register(self.cleanup) self._action_server = ActionServer(
self, BioVacuum, "/bio/actions/vacuum", self.execute_vacuum
)
def run(self): def run(self):
global thread global thread
thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True) thread = threading.Thread(target=rclpy.spin, args=(self,), daemon=True)
thread.start() thread.start()
# if in arm mode, will need to read from the MCU
try: try:
while rclpy.ok(): while rclpy.ok():
if self.launch_mode == "bio":
if self.ser.in_waiting:
self.read_mcu()
else:
time.sleep(0.1) time.sleep(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
self.cleanup()
# Currently will just spit out all values over the /arm/feedback/debug topic as strings
def read_mcu(self):
try:
output = str(self.ser.readline(), "utf8")
if output:
self.get_logger().info(f"[MCU] {output}")
msg = String()
msg.data = output
self.debug_pub.publish(msg)
except serial.SerialException:
self.get_logger().info("SerialException caught... closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except TypeError as e:
self.get_logger().info(f"TypeError: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
except Exception as e:
print(f"Exception: {e}")
print("Closing serial port.")
if self.ser.is_open:
self.ser.close()
pass
def send_ik(self, msg):
pass
def send_control(self, msg: BioControl): def send_control(self, msg: BioControl):
# CITADEL Control Commands print("todo")
################
# Chem Pumps, only send if not zero
if msg.pump_id != 0:
command = (
"can_relay_tovic,citadel,27,"
+ str(msg.pump_id)
+ ","
+ str(msg.pump_amount)
+ "\n"
)
self.send_cmd(command)
# Fans, only send if not zero
if msg.fan_id != 0:
command = (
"can_relay_tovic,citadel,40,"
+ str(msg.fan_id)
+ ","
+ str(msg.fan_duration)
+ "\n"
)
self.send_cmd(command)
# Servos, only send if not zero
if msg.servo_id != 0:
command = (
"can_relay_tovic,citadel,25,"
+ str(msg.servo_id)
+ ","
+ str(int(msg.servo_state))
+ "\n"
)
self.send_cmd(command)
# LSS (SCYTHE)
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
# self.send_cmd(command)
# Vibration Motor
command += "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
# self.send_cmd(command)
# FAERIE Control Commands
################
# To be reviewed before use#
# Laser
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
# self.send_cmd(command)
# Drill (SCABBARD)
command += f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
# self.send_cmd(command)
# Bio linear actuator
command += "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
self.send_cmd(command)
def send_cmd(self, msg: str): def send_cmd(self, msg: str):
if ( if (
@@ -196,46 +88,129 @@ class SerialRelay(Node):
output = String() output = String()
output.data = msg output.data = msg
self.anchor_pub.publish(output) self.anchor_pub.publish(output)
elif self.launch_mode == "bio": # if in standalone mode, send to MCU directly
self.get_logger().info(f"[Bio to MCU] {msg}") def relay_fromvic(self, msg: VicCAN):
self.ser.write(bytes(msg, "utf8")) # self.get_logger().info(msg)
if msg.mcu_name == "citadel":
self.process_fromvic_citadel(msg)
def process_fromvic_citadel(self, msg: VicCAN):
if msg.mcu_name != "citadel":
return
# Check message len to prevent crashing on bad data
if msg.command_id in viccan_citadel_msg_len_dict:
expected_len = viccan_citadel_msg_len_dict[msg.command_id]
if len(msg.data) != expected_len:
self.get_logger().warning(
f"Ignoring VicCAN message with id {msg.command_id} due to unexpected data length (expected {expected_len}, got {len(msg.data)})"
)
return
def anchor_feedback(self, msg: String): def anchor_feedback(self, msg: String):
output = msg.data output = msg.data
parts = str(output.strip()).split(",") parts = str(output.strip()).split(",")
# self.get_logger().info(f"[Bio Anchor] {msg.data}") self.get_logger().info(f"[Bio Anchor] {msg.data}")
# no data planned to be received from citadel, not sure about lance
if output.startswith( def distributor_callback(self, msg: BioDistributor):
"can_relay_fromvic,citadel,54" distributor_arr = msg.distributor_id
): # bat, 12, 5, Voltage readings * 100 vic_cmd = VicCAN(
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0 header=Header(stamp=self.get_clock().now().to_msg()),
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0 mcu_name="citadel",
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0 command_id=40,
elif output.startswith("can_relay_fromvic,digit,57"): data=[
self.bio_feedback.drill_temp = float(parts[3]) to_short(distributor_arr[0]),
self.bio_feedback.drill_humidity = float(parts[4]) to_short(distributor_arr[1]),
to_short(distributor_arr[2]),
0,
],
)
self.anchor_tovic_pub_.publish(vic_cmd)
def publish_feedback(self): def test_tube_callback(self, request, response):
self.feedback_pub.publish(self.bio_feedback) tubes_cmd = VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[
float(int.from_bytes(request.tube_id)),
float(request.milliliters),
],
)
self.anchor_tovic_pub_.publish(tubes_cmd)
return response
@staticmethod def execute_vacuum(self, goal_handle):
def list_serial_ports(): valve_id = int.from_bytes(goal_handle.request.valve_id)
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") duty = int.from_bytes(goal_handle.request.fan_duty_cycle_percent)
# return glob.glob("/dev/tty[A-Za-z]*") total = goal_handle.request.fan_time_ms
def cleanup(self): # open valve
print("Cleaning up...") self.anchor_tovic_pub_.publish(
try: VicCAN(
if self.ser.is_open: header=Header(stamp=self.get_clock().now().to_msg()),
self.ser.close() mcu_name="citadel",
except Exception as e: command_id=40,
exit(0) data=[float(valve_id)],
)
)
# set fan duty cycle
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[float(duty)],
)
)
feedback = BioVacuum.Feedback()
start = time.time()
while True:
elapsed = int((time.time() - start) * 1000)
remaining = max(0, total - elapsed)
feedback.fan_time_remaining_ms = remaining
goal_handle.publish_feedback(feedback)
if remaining == 0:
break
time.sleep(0.1)
# stop fan
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=19,
data=[0.0],
)
)
# close valve
self.anchor_tovic_pub_.publish(
VicCAN(
header=Header(stamp=self.get_clock().now().to_msg()),
mcu_name="citadel",
command_id=40,
data=[-1.0],
)
)
goal_handle.succeed()
return BioVacuum.Result()
def to_short(x: int) -> int:
return max(-32768, min(32767, x))
def myexcepthook(type, value, tb): def myexcepthook(type, value, tb):
print("Uncaught exception:", type, value) print("Uncaught exception:", type, value)
if serial_pub:
serial_pub.cleanup()
def main(args=None): def main(args=None):