mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
Merge branch 'bio-linac' into ptz-node
This commit is contained in:
@@ -90,9 +90,9 @@ class SerialRelay(Node):
|
||||
msg.data = output
|
||||
if output.startswith("can_relay_fromvic,core"):
|
||||
self.core_pub.publish(msg)
|
||||
elif output.startswith("can_relay_fromvic,arm"):
|
||||
elif output.startswith("can_relay_fromvic,arm") or output.startswith("can_relay_fromvic,digit"): # digit for voltage readings
|
||||
self.arm_pub.publish(msg)
|
||||
elif output.startswith("can_relay_fromvic,bio"):
|
||||
elif output.startswith("can_relay_fromvic,citadel") or output.startswith("can_relay_fromvic,digit"): # digit for SHT sensor
|
||||
self.bio_pub.publish(msg)
|
||||
# msg = String()
|
||||
# msg.data = output
|
||||
@@ -135,16 +135,16 @@ def myexcepthook(type, value, tb):
|
||||
print("Uncaught exception:", type, value)
|
||||
if serial_pub:
|
||||
serial_pub.cleanup()
|
||||
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
sys.excepthook = myexcepthook
|
||||
rclpy.init(args=args)
|
||||
sys.excepthook = myexcepthook
|
||||
|
||||
global serial_pub
|
||||
global serial_pub
|
||||
|
||||
serial_pub = SerialRelay()
|
||||
serial_pub.run()
|
||||
serial_pub = SerialRelay()
|
||||
serial_pub.run()
|
||||
|
||||
if __name__ == '__main__':
|
||||
signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||
|
||||
@@ -11,6 +11,7 @@ from std_msgs.msg import String
|
||||
from ros2_interfaces_pkg.msg import ArmManual
|
||||
from ros2_interfaces_pkg.msg import ArmIK
|
||||
from ros2_interfaces_pkg.msg import SocketFeedback
|
||||
from ros2_interfaces_pkg.msg import DigitFeedback
|
||||
|
||||
serial_pub = None
|
||||
thread = None
|
||||
@@ -28,6 +29,7 @@ class SerialRelay(Node):
|
||||
# Create publishers
|
||||
self.debug_pub = self.create_publisher(String, '/arm/feedback/debug', 10)
|
||||
self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
|
||||
self.digit_pub = self.create_publisher(DigitFeedback, '/arm/feedback/digit', 10)
|
||||
self.feedback_timer = self.create_timer(1.0, self.publish_feedback)
|
||||
|
||||
# Create subscribers
|
||||
@@ -40,6 +42,7 @@ class SerialRelay(Node):
|
||||
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
|
||||
|
||||
self.arm_feedback = SocketFeedback()
|
||||
self.digit_feedback = DigitFeedback()
|
||||
|
||||
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode
|
||||
if self.launch_mode == 'arm':
|
||||
@@ -182,16 +185,30 @@ class SerialRelay(Node):
|
||||
elif output.startswith("can_relay_fromvic,arm,53"):
|
||||
#pass
|
||||
self.updateMotorFeedback(output)
|
||||
elif output.startswith("can_relay_fromvic,digit,54"):
|
||||
parts = msg.data.split(",")
|
||||
if len(parts) >= 7:
|
||||
# Extract the voltage from the string
|
||||
voltages_in = parts[3:7]
|
||||
# Convert the voltages to floats
|
||||
self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0
|
||||
self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0
|
||||
self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0
|
||||
elif output.startswith("can_relay_fromvic,digit,55"):
|
||||
parts = msg.data.split(",")
|
||||
if len(parts) >= 4:
|
||||
self.digit_feedback.wrist_angle = float(parts[3])
|
||||
else:
|
||||
return
|
||||
|
||||
def publish_feedback(self):
|
||||
self.socket_pub.publish(self.arm_feedback)
|
||||
self.digit_pub.publish(self.digit_feedback)
|
||||
|
||||
def updateAngleFeedback(self, msg):
|
||||
def updateAngleFeedback(self, msg: String):
|
||||
# Angle feedbacks,
|
||||
#split the msg.data by commas
|
||||
parts = msg.split(",")
|
||||
parts = msg.data.split(",")
|
||||
|
||||
if len(parts) >= 7:
|
||||
# Extract the angles from the string
|
||||
@@ -224,9 +241,9 @@ class SerialRelay(Node):
|
||||
self.get_logger().info("Invalid angle feedback input format")
|
||||
|
||||
|
||||
def updateBusVoltage(self, msg):
|
||||
def updateBusVoltage(self, msg: String):
|
||||
# Bus Voltage feedbacks
|
||||
parts = msg.split(",")
|
||||
parts = msg.data.split(",")
|
||||
if len(parts) >= 7:
|
||||
# Extract the voltage from the string
|
||||
voltages_in = parts[3:7]
|
||||
@@ -242,7 +259,7 @@ class SerialRelay(Node):
|
||||
def updateMotorFeedback(self, msg):
|
||||
# Motor voltage/current/temperature feedback
|
||||
return
|
||||
# parts = msg.split(",")
|
||||
# parts = msg.data.split(",")
|
||||
# if len(parts) >= 7:
|
||||
# # Extract the voltage/current/temperature from the string
|
||||
# values_in = parts[3:7]
|
||||
|
||||
@@ -9,6 +9,7 @@ import atexit
|
||||
import signal
|
||||
from std_msgs.msg import String
|
||||
from ros2_interfaces_pkg.msg import BioControl
|
||||
from ros2_interfaces_pkg.msg import BioFeedback
|
||||
|
||||
serial_pub = None
|
||||
thread = None
|
||||
@@ -25,17 +26,22 @@ class SerialRelay(Node):
|
||||
|
||||
# Create publishers
|
||||
self.debug_pub = self.create_publisher(String, '/bio/feedback/debug', 10)
|
||||
#self.socket_pub = self.create_publisher(SocketFeedback, '/arm/feedback/socket', 10)
|
||||
self.feedback_pub = self.create_publisher(BioFeedback, '/bio/feedback', 10)
|
||||
|
||||
|
||||
# Create subscribers\
|
||||
# Create subscribers
|
||||
self.control_sub = self.create_subscription(BioControl, '/bio/control', self.send_control, 10)
|
||||
|
||||
# Create a publisher for telemetry
|
||||
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
|
||||
|
||||
# Topics used in anchor mode
|
||||
if self.launch_mode == 'anchor':
|
||||
self.anchor_sub = self.create_subscription(String, '/anchor/bio/feedback', self.anchor_feedback, 10)
|
||||
self.anchor_pub = self.create_publisher(String, '/anchor/relay', 10)
|
||||
|
||||
self.bio_feedback = BioFeedback()
|
||||
|
||||
|
||||
# Search for ports IF in 'arm' (standalone) and not 'anchor' mode
|
||||
if self.launch_mode == 'bio':
|
||||
@@ -117,7 +123,7 @@ class SerialRelay(Node):
|
||||
pass
|
||||
|
||||
|
||||
def send_control(self, msg):
|
||||
def send_control(self, msg: BioControl):
|
||||
# CITADEL Control Commands
|
||||
################
|
||||
|
||||
@@ -132,12 +138,12 @@ class SerialRelay(Node):
|
||||
self.send_cmd(command)
|
||||
# Servos, only send if not zero
|
||||
if msg.servo_id != 0:
|
||||
command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(msg.servo_position) + "\n"
|
||||
self.send_cmd(command)
|
||||
command = "can_relay_tovic,citadel,25," + str(msg.servo_id) + "," + str(int(msg.servo_state)) + "\n"
|
||||
self.send_cmd(command)
|
||||
|
||||
|
||||
# LSS
|
||||
command = "can_relay_tovic,citadel,24," + str(msg.lss_direction) + "\n"
|
||||
# LSS (SCYTHE)
|
||||
command = "can_relay_tovic,citadel,24," + str(msg.bio_arm) + "\n"
|
||||
self.send_cmd(command)
|
||||
# Vibration Motor
|
||||
command = "can_relay_tovic,citadel,26," + str(msg.vibration_motor) + "\n"
|
||||
@@ -150,21 +156,21 @@ class SerialRelay(Node):
|
||||
# To be reviewed before use#
|
||||
|
||||
# Laser
|
||||
command = "can_relay_tovic,faerie,28," + str(msg.laser) + "\n"
|
||||
command = "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
|
||||
self.send_cmd(command)
|
||||
|
||||
# # UV Light
|
||||
# command = "can_relay_tovic,faerie,38," + str(msg.uvLight) + "\n"
|
||||
# self.send_cmd(command)
|
||||
|
||||
# Drill
|
||||
command = f"can_relay_tovic,faerie,19,{msg.drill_duty:.2f}\n"
|
||||
print(msg.drill_duty)
|
||||
# Drill (SCABBARD)
|
||||
command = f"can_relay_tovic,digit,19,{msg.drill:.2f}\n"
|
||||
print(msg.drill)
|
||||
self.send_cmd(command)
|
||||
|
||||
# Bio linear actuator
|
||||
command = "can_relay_tovic,digit,42," + str(msg.drill_arm) + "\n"
|
||||
self.send_cmd(command)
|
||||
|
||||
|
||||
|
||||
def send_cmd(self, msg):
|
||||
def send_cmd(self, msg: str):
|
||||
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
|
||||
output = String()
|
||||
output.data = msg
|
||||
@@ -173,10 +179,21 @@ class SerialRelay(Node):
|
||||
self.get_logger().info(f"[Bio to MCU] {msg}")
|
||||
self.ser.write(bytes(msg, "utf8"))
|
||||
|
||||
def anchor_feedback(self, msg):
|
||||
def anchor_feedback(self, msg: String):
|
||||
output = msg.data
|
||||
parts = str(output.strip()).split(",")
|
||||
self.get_logger().info(f"[Bio Anchor] {msg.data}")
|
||||
#self.send_cmd(msg.data)
|
||||
|
||||
if output.startswith("can_relay_fromvic,citadel,54"): # bat, 12, 5, Voltage readings * 100
|
||||
self.bio_feedback.bat_voltage = float(parts[3]) / 100.0
|
||||
self.bio_feedback.voltage_12 = float(parts[4]) / 100.0
|
||||
self.bio_feedback.voltage_5 = float(parts[5]) / 100.0
|
||||
elif output.startswith("can_relay_fromvic,digit,57"):
|
||||
self.bio_feedback.drill_temp = float(parts[3])
|
||||
self.bio_feedback.drill_humidity = float(parts[4])
|
||||
|
||||
def publish_feedback(self):
|
||||
self.feedback_pub.publish(self.bio_feedback)
|
||||
|
||||
@staticmethod
|
||||
def list_serial_ports():
|
||||
|
||||
@@ -152,7 +152,7 @@ class SerialRelay(Node):
|
||||
self.ser.close()
|
||||
self.exit(1)
|
||||
|
||||
def scale_duty(self, value, max_speed):
|
||||
def scale_duty(self, value: float, max_speed: float):
|
||||
leftMin = -1
|
||||
leftMax = 1
|
||||
rightMin = -max_speed/100.0
|
||||
@@ -172,12 +172,15 @@ class SerialRelay(Node):
|
||||
def send_controls(self, msg):
|
||||
#can_relay_tovic,core,19, left_stick, right_stick
|
||||
if(msg.turn_to_enable):
|
||||
command = "can_relay_tovic,core,41," + msg.turn_to + ',' + msg.turn_to_timeout + '\n'
|
||||
command = "can_relay_tovic,core,41," + str(msg.turn_to) + ',' + str(msg.turn_to_timeout) + '\n'
|
||||
else:
|
||||
command = "can_relay_tovic,core,19," + self.scale_duty(msg.left_stick, msg.max_speed) + ',' + self.scale_duty(msg.right_stick, msg.max_speed) + '\n'
|
||||
|
||||
self.send_cmd(command)
|
||||
|
||||
|
||||
# Brake mode
|
||||
command = "can_relay_tovic,core,18," + str(int(msg.brake)) + '\n'
|
||||
self.send_cmd(command)
|
||||
|
||||
#print(f"[Sys] Relaying: {command}")
|
||||
def send_cmd(self, msg):
|
||||
if self.launch_mode == 'anchor':
|
||||
@@ -189,7 +192,7 @@ class SerialRelay(Node):
|
||||
self.get_logger().info(f"[Core to MCU] {msg}")
|
||||
self.ser.write(bytes(msg, "utf8"))
|
||||
|
||||
def anchor_feedback(self, msg):
|
||||
def anchor_feedback(self, msg: String):
|
||||
output = msg.data
|
||||
parts = str(output.strip()).split(",")
|
||||
#self.get_logger().info(f"[ANCHOR FEEDBACK parts] {parts}")
|
||||
@@ -203,6 +206,7 @@ class SerialRelay(Node):
|
||||
self.core_feedback.bno_gyro.x = float(parts[3])
|
||||
self.core_feedback.bno_gyro.y = float(parts[4])
|
||||
self.core_feedback.bno_gyro.z = float(parts[5])
|
||||
self.core_feedback.imu_calib = round(float(parts[6]))
|
||||
elif output.startswith("can_relay_fromvic,core,52"):#Accel x,y,z, heading *10
|
||||
self.core_feedback.bno_accel.x = float(parts[3])
|
||||
self.core_feedback.bno_accel.y = float(parts[4])
|
||||
|
||||
Reference in New Issue
Block a user