mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 09:20:40 +00:00
style: get ready for main
This commit is contained in:
@@ -27,16 +27,16 @@ from ament_index_python.packages import get_package_share_directory
|
|||||||
|
|
||||||
from . import astra_arm
|
from . import astra_arm
|
||||||
|
|
||||||
control_qos = qos.QoSProfile(
|
# control_qos = qos.QoSProfile(
|
||||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
# history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||||
depth=1,
|
# depth=1,
|
||||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
# durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||||
deadline=1000,
|
# deadline=1000,
|
||||||
lifespan=500,
|
# lifespan=500,
|
||||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||||
liveliness_lease_duration=5000
|
# liveliness_lease_duration=5000
|
||||||
)
|
# )
|
||||||
|
|
||||||
serial_pub = None
|
serial_pub = None
|
||||||
thread = None
|
thread = None
|
||||||
@@ -59,7 +59,7 @@ class SerialRelay(Node):
|
|||||||
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
self.feedback_timer = self.create_timer(0.25, self.publish_feedback)
|
||||||
|
|
||||||
# Create subscribers
|
# Create subscribers
|
||||||
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, qos_profile=control_qos)
|
self.ik_sub = self.create_subscription(ArmIK, '/arm/control/ik', self.send_ik, 10)
|
||||||
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
|
self.man_sub = self.create_subscription(ArmManual, '/arm/control/manual', self.send_manual, 10)
|
||||||
|
|
||||||
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
|
self.ik_debug = self.create_publisher(String, '/arm/debug/ik', 10)
|
||||||
@@ -152,161 +152,6 @@ class SerialRelay(Node):
|
|||||||
self.ser.close()
|
self.ser.close()
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def updateAngleFeedback(self, msg: str):
|
|
||||||
# Angle feedbacks,
|
|
||||||
# split the msg.data by commas
|
|
||||||
parts = msg.split(",")
|
|
||||||
|
|
||||||
if len(parts) >= 7:
|
|
||||||
# Extract the angles from the string
|
|
||||||
angles_in = parts[3:7]
|
|
||||||
# Convert the angles to floats divide by 10.0
|
|
||||||
angles = [float(angle) / 10.0 for angle in angles_in]
|
|
||||||
# angles[0] = 0.0 #override axis0 to zero
|
|
||||||
#
|
|
||||||
#
|
|
||||||
#THIS NEEDS TO BE REMOVED LATER
|
|
||||||
#PLACEHOLDER FOR WRIST VALUE
|
|
||||||
#
|
|
||||||
#
|
|
||||||
angles.append(0.0)#placeholder for wrist_continuous
|
|
||||||
angles.append(0.0)#placeholder for wrist
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# # Update the arm's current angles
|
|
||||||
self.arm.update_angles(angles)
|
|
||||||
self.arm_feedback.axis0_angle = angles[0]
|
|
||||||
self.arm_feedback.axis1_angle = angles[1]
|
|
||||||
self.arm_feedback.axis2_angle = angles[2]
|
|
||||||
self.arm_feedback.axis3_angle = angles[3]
|
|
||||||
# self.get_logger().info(f"Angles: {angles}")
|
|
||||||
# #debug publish angles
|
|
||||||
# tempMsg = String()
|
|
||||||
# tempMsg.data = "Angles: " + str(angles)
|
|
||||||
# #self.debug_pub.publish(tempMsg)
|
|
||||||
else:
|
|
||||||
self.get_logger().info("Invalid angle feedback input format")
|
|
||||||
|
|
||||||
|
|
||||||
def updateBusVoltage(self, msg: str):
|
|
||||||
# Bus Voltage feedbacks
|
|
||||||
parts = msg.split(",")
|
|
||||||
if len(parts) >= 7:
|
|
||||||
# Extract the voltage from the string
|
|
||||||
voltages_in = parts[3:7]
|
|
||||||
# Convert the voltages to floats
|
|
||||||
self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0
|
|
||||||
self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0
|
|
||||||
self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0
|
|
||||||
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
|
|
||||||
else:
|
|
||||||
self.get_logger().info("Invalid voltage feedback input format")
|
|
||||||
|
|
||||||
def updateMotorFeedback(self, msg: str):
|
|
||||||
parts = str(msg.strip()).split(",")
|
|
||||||
motorId = round(float(parts[3]))
|
|
||||||
temp = float(parts[4]) / 10.0
|
|
||||||
voltage = float(parts[5]) / 10.0
|
|
||||||
current = float(parts[6]) / 10.0
|
|
||||||
if motorId == 1:
|
|
||||||
self.arm_feedback.axis1_temp = temp
|
|
||||||
self.arm_feedback.axis1_voltage = voltage
|
|
||||||
self.arm_feedback.axis1_current = current
|
|
||||||
elif motorId == 2:
|
|
||||||
self.arm_feedback.axis2_temp = temp
|
|
||||||
self.arm_feedback.axis2_voltage = voltage
|
|
||||||
self.arm_feedback.axis2_current = current
|
|
||||||
elif motorId == 3:
|
|
||||||
self.arm_feedback.axis3_temp = temp
|
|
||||||
self.arm_feedback.axis3_voltage = voltage
|
|
||||||
self.arm_feedback.axis3_current = current
|
|
||||||
elif motorId == 4:
|
|
||||||
self.arm_feedback.axis0_temp = temp
|
|
||||||
self.arm_feedback.axis0_voltage = voltage
|
|
||||||
self.arm_feedback.axis0_current = current
|
|
||||||
|
|
||||||
def send_manual(self, msg: ArmManual):
|
|
||||||
axis0 = msg.axis0
|
|
||||||
axis1 = -1 * msg.axis1
|
|
||||||
axis2 = msg.axis2
|
|
||||||
axis3 = msg.axis3
|
|
||||||
|
|
||||||
#Send controls for arm
|
|
||||||
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
|
|
||||||
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
|
|
||||||
|
|
||||||
#Send controls for end effector
|
|
||||||
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
|
|
||||||
|
|
||||||
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
|
|
||||||
|
|
||||||
self.send_cmd(command)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#print(f"[Wrote] {command}", end="")
|
|
||||||
|
|
||||||
#Not yet finished, needs embedded implementation for new commands
|
|
||||||
# ef_roll = msg.effector_roll
|
|
||||||
# ef_yaw = msg.effector_yaw
|
|
||||||
# gripper = msg.gripper
|
|
||||||
# actuator = msg.linear_actuator
|
|
||||||
# laser = msg.laser
|
|
||||||
# #Send controls for digit
|
|
||||||
|
|
||||||
# command = "can_relay_tovic,digit," + str(ef_roll) + "," + str(ef_yaw) + "," + str(gripper) + "," + str(actuator) + "," + str(laser) + "\n"
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
def send_cmd(self, msg: str):
|
|
||||||
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
|
|
||||||
output = String()
|
|
||||||
output.data = msg
|
|
||||||
self.anchor_pub.publish(output)
|
|
||||||
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
|
|
||||||
self.get_logger().info(f"[Arm to MCU] {msg.data}")
|
|
||||||
self.ser.write(bytes(msg, "utf8"))
|
|
||||||
|
|
||||||
def anchor_feedback(self, msg: String):
|
|
||||||
output = msg.data
|
|
||||||
if output.startswith("can_relay_fromvic,arm,55"):
|
|
||||||
#pass
|
|
||||||
self.updateAngleFeedback(output)
|
|
||||||
elif output.startswith("can_relay_fromvic,arm,54"):
|
|
||||||
#pass
|
|
||||||
self.updateBusVoltage(output)
|
|
||||||
elif output.startswith("can_relay_fromvic,arm,53"):
|
|
||||||
self.updateMotorFeedback(output)
|
|
||||||
elif output.startswith("can_relay_fromvic,digit,54"):
|
|
||||||
parts = msg.data.split(",")
|
|
||||||
if len(parts) >= 7:
|
|
||||||
# Extract the voltage from the string
|
|
||||||
voltages_in = parts[3:7]
|
|
||||||
# Convert the voltages to floats
|
|
||||||
self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0
|
|
||||||
self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0
|
|
||||||
self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0
|
|
||||||
elif output.startswith("can_relay_fromvic,digit,55"):
|
|
||||||
parts = msg.data.split(",")
|
|
||||||
if len(parts) >= 4:
|
|
||||||
self.digit_feedback.wrist_angle = float(parts[3])
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
def publish_feedback(self):
|
|
||||||
self.socket_pub.publish(self.arm_feedback)
|
|
||||||
self.digit_pub.publish(self.digit_feedback)
|
|
||||||
|
|
||||||
def send_ik(self, msg):
|
def send_ik(self, msg):
|
||||||
# Convert Vector3 to a NumPy array
|
# Convert Vector3 to a NumPy array
|
||||||
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
|
input_raw = np.array([-msg.movement_vector.x, msg.movement_vector.y, msg.movement_vector.z]) # Convert input to a NumPy array
|
||||||
@@ -381,6 +226,146 @@ class SerialRelay(Node):
|
|||||||
#self.debug_pub.publish(tempMsg)
|
#self.debug_pub.publish(tempMsg)
|
||||||
|
|
||||||
|
|
||||||
|
def send_manual(self, msg: ArmManual):
|
||||||
|
axis0 = msg.axis0
|
||||||
|
axis1 = -1 * msg.axis1
|
||||||
|
axis2 = msg.axis2
|
||||||
|
axis3 = msg.axis3
|
||||||
|
|
||||||
|
#Send controls for arm
|
||||||
|
command = "can_relay_tovic,arm,18," + str(int(msg.brake)) + "\n"
|
||||||
|
command += "can_relay_tovic,arm,39," + str(axis0) + "," + str(axis1) + "," + str(axis2) + "," + str(axis3) + "\n"
|
||||||
|
|
||||||
|
#Send controls for end effector
|
||||||
|
command += "can_relay_tovic,digit,35," + str(msg.effector_roll) + "\n"
|
||||||
|
|
||||||
|
|
||||||
|
command += "can_relay_tovic,digit,36,0," + str(msg.effector_yaw) + "\n"
|
||||||
|
|
||||||
|
|
||||||
|
command += "can_relay_tovic,digit,26," + str(msg.gripper) + "\n"
|
||||||
|
|
||||||
|
|
||||||
|
command += "can_relay_tovic,digit,28," + str(msg.laser) + "\n"
|
||||||
|
|
||||||
|
command += "can_relay_tovic,digit,34," + str(msg.linear_actuator) + "\n"
|
||||||
|
|
||||||
|
self.send_cmd(command)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def send_cmd(self, msg: str):
|
||||||
|
if self.launch_mode == 'anchor': #if in anchor mode, send to anchor node to relay
|
||||||
|
output = String()
|
||||||
|
output.data = msg
|
||||||
|
self.anchor_pub.publish(output)
|
||||||
|
elif self.launch_mode == 'arm': #if in standalone mode, send to MCU directly
|
||||||
|
self.get_logger().info(f"[Arm to MCU] {msg.data}")
|
||||||
|
self.ser.write(bytes(msg, "utf8"))
|
||||||
|
|
||||||
|
def anchor_feedback(self, msg: String):
|
||||||
|
output = msg.data
|
||||||
|
if output.startswith("can_relay_fromvic,arm,55"):
|
||||||
|
#pass
|
||||||
|
self.updateAngleFeedback(output)
|
||||||
|
elif output.startswith("can_relay_fromvic,arm,54"):
|
||||||
|
#pass
|
||||||
|
self.updateBusVoltage(output)
|
||||||
|
elif output.startswith("can_relay_fromvic,arm,53"):
|
||||||
|
self.updateMotorFeedback(output)
|
||||||
|
elif output.startswith("can_relay_fromvic,digit,54"):
|
||||||
|
parts = msg.data.split(",")
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the voltage from the string
|
||||||
|
voltages_in = parts[3:7]
|
||||||
|
# Convert the voltages to floats
|
||||||
|
self.digit_feedback.bat_voltage = float(voltages_in[0]) / 100.0
|
||||||
|
self.digit_feedback.voltage_12 = float(voltages_in[1]) / 100.0
|
||||||
|
self.digit_feedback.voltage_5 = float(voltages_in[2]) / 100.0
|
||||||
|
elif output.startswith("can_relay_fromvic,digit,55"):
|
||||||
|
parts = msg.data.split(",")
|
||||||
|
if len(parts) >= 4:
|
||||||
|
self.digit_feedback.wrist_angle = float(parts[3])
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
def publish_feedback(self):
|
||||||
|
self.socket_pub.publish(self.arm_feedback)
|
||||||
|
self.digit_pub.publish(self.digit_feedback)
|
||||||
|
|
||||||
|
def updateAngleFeedback(self, msg: str):
|
||||||
|
# Angle feedbacks,
|
||||||
|
# split the msg.data by commas
|
||||||
|
parts = msg.split(",")
|
||||||
|
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the angles from the string
|
||||||
|
angles_in = parts[3:7]
|
||||||
|
# Convert the angles to floats divide by 10.0
|
||||||
|
angles = [float(angle) / 10.0 for angle in angles_in]
|
||||||
|
# angles[0] = 0.0 #override axis0 to zero
|
||||||
|
#
|
||||||
|
#
|
||||||
|
#THIS NEEDS TO BE REMOVED LATER
|
||||||
|
#PLACEHOLDER FOR WRIST VALUE
|
||||||
|
#
|
||||||
|
#
|
||||||
|
angles.append(0.0)#placeholder for wrist_continuous
|
||||||
|
angles.append(0.0)#placeholder for wrist
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# # Update the arm's current angles
|
||||||
|
self.arm.update_angles(angles)
|
||||||
|
self.arm_feedback.axis0_angle = angles[0]
|
||||||
|
self.arm_feedback.axis1_angle = angles[1]
|
||||||
|
self.arm_feedback.axis2_angle = angles[2]
|
||||||
|
self.arm_feedback.axis3_angle = angles[3]
|
||||||
|
# self.get_logger().info(f"Angles: {angles}")
|
||||||
|
# #debug publish angles
|
||||||
|
# tempMsg = String()
|
||||||
|
# tempMsg.data = "Angles: " + str(angles)
|
||||||
|
# #self.debug_pub.publish(tempMsg)
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Invalid angle feedback input format")
|
||||||
|
|
||||||
|
def updateBusVoltage(self, msg: str):
|
||||||
|
# Bus Voltage feedbacks
|
||||||
|
parts = msg.split(",")
|
||||||
|
if len(parts) >= 7:
|
||||||
|
# Extract the voltage from the string
|
||||||
|
voltages_in = parts[3:7]
|
||||||
|
# Convert the voltages to floats
|
||||||
|
self.arm_feedback.bat_voltage = float(voltages_in[0]) / 100.0
|
||||||
|
self.arm_feedback.voltage_12 = float(voltages_in[1]) / 100.0
|
||||||
|
self.arm_feedback.voltage_5 = float(voltages_in[2]) / 100.0
|
||||||
|
self.arm_feedback.voltage_3 = float(voltages_in[3]) / 100.0
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Invalid voltage feedback input format")
|
||||||
|
|
||||||
|
def updateMotorFeedback(self, msg: str):
|
||||||
|
parts = str(msg.strip()).split(",")
|
||||||
|
motorId = round(float(parts[3]))
|
||||||
|
temp = float(parts[4]) / 10.0
|
||||||
|
voltage = float(parts[5]) / 10.0
|
||||||
|
current = float(parts[6]) / 10.0
|
||||||
|
if motorId == 1:
|
||||||
|
self.arm_feedback.axis1_temp = temp
|
||||||
|
self.arm_feedback.axis1_voltage = voltage
|
||||||
|
self.arm_feedback.axis1_current = current
|
||||||
|
elif motorId == 2:
|
||||||
|
self.arm_feedback.axis2_temp = temp
|
||||||
|
self.arm_feedback.axis2_voltage = voltage
|
||||||
|
self.arm_feedback.axis2_current = current
|
||||||
|
elif motorId == 3:
|
||||||
|
self.arm_feedback.axis3_temp = temp
|
||||||
|
self.arm_feedback.axis3_voltage = voltage
|
||||||
|
self.arm_feedback.axis3_current = current
|
||||||
|
elif motorId == 4:
|
||||||
|
self.arm_feedback.axis0_temp = temp
|
||||||
|
self.arm_feedback.axis0_voltage = voltage
|
||||||
|
self.arm_feedback.axis0_current = current
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_serial_ports():
|
def list_serial_ports():
|
||||||
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
|
return glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
|
||||||
|
|||||||
@@ -20,16 +20,16 @@ from ros2_interfaces_pkg.msg import CoreControl
|
|||||||
serial_pub = None
|
serial_pub = None
|
||||||
thread = None
|
thread = None
|
||||||
|
|
||||||
control_qos = qos.QoSProfile(
|
# control_qos = qos.QoSProfile(
|
||||||
history=qos.QoSHistoryPolicy.KEEP_LAST,
|
# history=qos.QoSHistoryPolicy.KEEP_LAST,
|
||||||
depth=1,
|
# depth=1,
|
||||||
reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
# reliability=qos.QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
durability=qos.QoSDurabilityPolicy.VOLATILE,
|
# durability=qos.QoSDurabilityPolicy.VOLATILE,
|
||||||
deadline=1000,
|
# deadline=1000,
|
||||||
lifespan=500,
|
# lifespan=500,
|
||||||
liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
# liveliness=qos.QoSLivelinessPolicy.SYSTEM_DEFAULT,
|
||||||
liveliness_lease_duration=5000
|
# liveliness_lease_duration=5000
|
||||||
)
|
# )
|
||||||
|
|
||||||
class SerialRelay(Node):
|
class SerialRelay(Node):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -45,7 +45,7 @@ class SerialRelay(Node):
|
|||||||
self.debug_pub = self.create_publisher(String, '/core/debug', 10)
|
self.debug_pub = self.create_publisher(String, '/core/debug', 10)
|
||||||
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
|
self.feedback_pub = self.create_publisher(CoreFeedback, '/core/feedback', 10)
|
||||||
# Create a subscriber
|
# Create a subscriber
|
||||||
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, qos_profile=control_qos)
|
self.control_sub = self.create_subscription(CoreControl, '/core/control', self.send_controls, 10)
|
||||||
|
|
||||||
# Create a publisher for telemetry
|
# Create a publisher for telemetry
|
||||||
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
|
self.telemetry_pub_timer = self.create_timer(1.0, self.publish_feedback)
|
||||||
|
|||||||
Reference in New Issue
Block a user