mirror of
https://github.com/SHC-ASTRA/rover-ros2.git
synced 2026-02-11 17:30:36 +00:00
Compare commits
1 Commits
can-transc
...
gazebo-doc
| Author | SHA1 | Date | |
|---|---|---|---|
| 4d84b5f8cc |
11
README.md
11
README.md
@@ -65,10 +65,10 @@ You can fake the presence of a Serial device (i.e., MCU) by using the following
|
|||||||
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
|
$ socat -dd -v pty,rawer,crnl,link=/tmp/ttyACM9 pty,rawer,crnl,link=/tmp/ttyOUT
|
||||||
```
|
```
|
||||||
|
|
||||||
When you go to run anchor, use the `port_override` launch parameter to point it to the fake serial port, like so:
|
When you go to run anchor, use the `PORT_OVERRIDE` environment variable to point it to the fake serial port, like so:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
$ ros2 launch anchor_pkg rover.launch.py port_override:=/tmp/ttyACM9
|
$ PORT_OVERRIDE=/tmp/ttyACM9 ros2 launch anchor_pkg rover.launch.py
|
||||||
```
|
```
|
||||||
|
|
||||||
### Connecting the GuliKit Controller
|
### Connecting the GuliKit Controller
|
||||||
@@ -121,24 +121,23 @@ A: Don't worry about it. If you had the workspace sourced, ROS2 will complain ab
|
|||||||
...
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
A: To find a microcontroller to talk to, Anchor filters through your available serial ports to find microcontrollers. If the microcontroller fails
|
A: To find a microcontroller to talk to, Anchor sends a ping to every Serial port on your computer. If it does not receive a 'pong' in less than one second, then it aborts. There are a few possible fixes:
|
||||||
to respond properly, or one is not found, it will abort. There are a few possible fixes:
|
|
||||||
|
|
||||||
- Keep trying to run it until it works
|
- Keep trying to run it until it works
|
||||||
- Run `lsusb` to see if the microcontroller is detected by your computer.
|
- Run `lsusb` to see if the microcontroller is detected by your computer.
|
||||||
- Run `ls /dev/tty*0` to see if there is a valid Serial port enumerated for the microcontroller.
|
- Run `ls /dev/tty*0` to see if there is a valid Serial port enumerated for the microcontroller.
|
||||||
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`) by using the `groups` command.
|
- Check if you are in the `dialout` group (or whatever group shows up by running `ls -l /dev/tty*`).
|
||||||
|
|
||||||
## Packages
|
## Packages
|
||||||
|
|
||||||
- [anchor\_pkg](./src/anchor_pkg) - Handles Serial communication between the various other packages here and the microcontroller.
|
- [anchor\_pkg](./src/anchor_pkg) - Handles Serial communication between the various other packages here and the microcontroller.
|
||||||
- [arm\_pkg](./src/arm_pkg) - Relays controls and sensor data for the arm (socket and digit) between anchor and basestation/headless.
|
- [arm\_pkg](./src/arm_pkg) - Relays controls and sensor data for the arm (socket and digit) between anchor and basestation/headless.
|
||||||
- [astra\_descriptions](./src/astra_descriptions) - Submodule with URDF-related packages.
|
- [astra\_descriptions](./src/astra_descriptions) - Submodule with URDF-related packages.
|
||||||
- [astra\_msgs](./src/astra_msgs) - Contains custom message types for communication between basestation and the rover over ROS2.
|
|
||||||
- [bio\_pkg](./src/bio_pkg) - Like arm_pkg, but for CITADEL and FAERIE
|
- [bio\_pkg](./src/bio_pkg) - Like arm_pkg, but for CITADEL and FAERIE
|
||||||
- [core\_pkg](./src/core_pkg) - Like arm_pkg, but for Core
|
- [core\_pkg](./src/core_pkg) - Like arm_pkg, but for Core
|
||||||
- [headless\_pkg](./src/headless_pkg) - Simple, non-graphical controller node to work in place of basestation when controlling the rover by itself. This is autostarted with anchor to allow for setup-less control of the rover.
|
- [headless\_pkg](./src/headless_pkg) - Simple, non-graphical controller node to work in place of basestation when controlling the rover by itself. This is autostarted with anchor to allow for setup-less control of the rover.
|
||||||
- [latency\_tester](./src/latency_tester) - A temporary node to test comms latency over ROS2, Serial, and CAN.
|
- [latency\_tester](./src/latency_tester) - A temporary node to test comms latency over ROS2, Serial, and CAN.
|
||||||
|
- [ros2\_interfaces\_pkg](./src/ros2_interfaces_pkg) - Contains custom message types for communication between basestation and the rover over ROS2. (being renamed to `astra_msgs`).
|
||||||
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
|
- [servo\_arm\_twist\_pkg](./src/servo_arm_twist_pkg) - A temporary node to translate controller state from `ros2_joy` to `Twist` messages to control the Arm via IK.
|
||||||
|
|
||||||
## Maintainers
|
## Maintainers
|
||||||
|
|||||||
@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
|
|||||||
echo "[INFO] Starting ROS node..."
|
echo "[INFO] Starting ROS node..."
|
||||||
|
|
||||||
# Source ROS 2 Humble setup script
|
# Source ROS 2 Humble setup script
|
||||||
if command -v nixos-rebuild; then
|
|
||||||
echo "[INFO] running on NixOS"
|
|
||||||
else
|
|
||||||
source /opt/ros/humble/setup.bash
|
source /opt/ros/humble/setup.bash
|
||||||
fi
|
|
||||||
|
|
||||||
# Source your workspace setup script
|
# Source your workspace setup script
|
||||||
source $SCRIPT_DIR/../install/setup.bash
|
source $SCRIPT_DIR/../install/setup.bash
|
||||||
|
|||||||
@@ -15,11 +15,7 @@ echo "[INFO] Network interface is up!"
|
|||||||
echo "[INFO] Starting ROS node..."
|
echo "[INFO] Starting ROS node..."
|
||||||
|
|
||||||
# Source ROS 2 Humble setup script
|
# Source ROS 2 Humble setup script
|
||||||
if command -v nixos-rebuild; then
|
|
||||||
echo "[INFO] running on NixOS"
|
|
||||||
else
|
|
||||||
source /opt/ros/humble/setup.bash
|
source /opt/ros/humble/setup.bash
|
||||||
fi
|
|
||||||
|
|
||||||
# Source your workspace setup script
|
# Source your workspace setup script
|
||||||
source $SCRIPT_DIR/../install/setup.bash
|
source $SCRIPT_DIR/../install/setup.bash
|
||||||
|
|||||||
@@ -17,11 +17,7 @@ done
|
|||||||
echo "[INFO] Network interface is up!"
|
echo "[INFO] Network interface is up!"
|
||||||
|
|
||||||
|
|
||||||
if command -v nixos-rebuild; then
|
|
||||||
echo "[INFO] running on NixOS"
|
|
||||||
else
|
|
||||||
source /opt/ros/humble/setup.bash
|
source /opt/ros/humble/setup.bash
|
||||||
fi
|
|
||||||
source $ANCHOR_WS/install/setup.bash
|
source $ANCHOR_WS/install/setup.bash
|
||||||
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash
|
[[ -f $AUTONOMY_WS/install/setup.bash ]] && source $AUTONOMY_WS/install/setup.bash
|
||||||
|
|
||||||
|
|||||||
18
docker-compose.yml
Normal file
18
docker-compose.yml
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
|
||||||
|
services:
|
||||||
|
ros2:
|
||||||
|
image: osrf/ros:jazzy-desktop-full
|
||||||
|
container_name: ros2_jazzy_gui
|
||||||
|
network_mode: host
|
||||||
|
environment:
|
||||||
|
- DISPLAY=${DISPLAY}
|
||||||
|
- QT_X11_NO_MITSHM=1
|
||||||
|
- XAUTHORITY=/tmp/.docker.xauth
|
||||||
|
- NVIDIA_VISIBLE_DEVICES=all
|
||||||
|
- NVIDIA_DRIVER_CAPABILITIES=all
|
||||||
|
volumes:
|
||||||
|
- /tmp/.X11-unix:/tmp/.X11-unix:rw
|
||||||
|
- ./ros_ws:/ros_ws
|
||||||
|
stdin_open: true
|
||||||
|
tty: true
|
||||||
|
privileged: true
|
||||||
@@ -1,28 +1,24 @@
|
|||||||
import atexit
|
|
||||||
import glob
|
|
||||||
import signal
|
|
||||||
import threading
|
|
||||||
from typing import cast
|
|
||||||
|
|
||||||
import rclpy
|
import rclpy
|
||||||
import serial
|
|
||||||
import serial.tools.list_ports
|
|
||||||
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
|
|
||||||
from rclpy.executors import ExternalShutdownException
|
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
from std_msgs.msg import Header, String
|
from std_srvs.srv import Empty
|
||||||
|
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
import atexit
|
||||||
|
|
||||||
|
import serial
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import glob
|
||||||
|
|
||||||
|
from std_msgs.msg import String, Header
|
||||||
from astra_msgs.msg import VicCAN
|
from astra_msgs.msg import VicCAN
|
||||||
|
|
||||||
KNOWN_USBS = [
|
serial_pub = None
|
||||||
(0x2E8A, 0x00C0), # Raspberry Pi Pico
|
thread = None
|
||||||
(0x1A86, 0x55D4), # Adafruit Feather ESP32 V2
|
|
||||||
(0x10C4, 0xEA60), # DOIT ESP32 Devkit V1
|
|
||||||
(0x1A86, 0x55D3), # ESP32 S3 Development Board
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class Anchor(Node):
|
|
||||||
"""
|
"""
|
||||||
Publishers:
|
Publishers:
|
||||||
* /anchor/from_vic/debug
|
* /anchor/from_vic/debug
|
||||||
@@ -43,95 +39,46 @@ class Anchor(Node):
|
|||||||
- Publish raw strings to this topic to send directly to the MCU for debugging
|
- Publish raw strings to this topic to send directly to the MCU for debugging
|
||||||
"""
|
"""
|
||||||
|
|
||||||
serial_port: str | None = None # e.g., "/dev/ttyUSB0"
|
|
||||||
|
|
||||||
|
class SerialRelay(Node):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Initalize node with name
|
# Initalize node with name
|
||||||
super().__init__("anchor_node")
|
super().__init__("anchor_node") # previously 'serial_publisher'
|
||||||
|
|
||||||
self.declare_parameter(
|
# Loop through all serial devices on the computer to check for the MCU
|
||||||
"port_override",
|
self.port = None
|
||||||
"",
|
if port_override := os.getenv("PORT_OVERRIDE"):
|
||||||
ParameterDescriptor(
|
self.port = port_override
|
||||||
name="port_override", type=ParameterType.PARAMETER_STRING
|
ports = SerialRelay.list_serial_ports()
|
||||||
),
|
for i in range(4):
|
||||||
)
|
if self.port is not None:
|
||||||
|
|
||||||
# Serial port override
|
|
||||||
if port_override := self.get_parameter("port_override").value:
|
|
||||||
self.serial_port = cast(str, port_override) # Cast to make the linter happy
|
|
||||||
|
|
||||||
##################################################
|
|
||||||
# Serial MCU Discovery
|
|
||||||
|
|
||||||
# If there was not a port override, look for a MCU over USB for Serial.
|
|
||||||
if self.serial_port is None:
|
|
||||||
comports = serial.tools.list_ports.comports()
|
|
||||||
recog_ports = list(
|
|
||||||
filter(
|
|
||||||
# Filter for ports we know that aren't invalid
|
|
||||||
lambda p: (p.vid, p.pid) in KNOWN_USBS and p.device is not None,
|
|
||||||
comports,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Guards
|
|
||||||
if len(recog_ports) > 1: # If we found too many
|
|
||||||
self.get_logger().fatal(
|
|
||||||
f"Found multiple recognized MCUs: {[p.device for p in recog_ports].__str__()}"
|
|
||||||
)
|
|
||||||
exit(1)
|
|
||||||
if len(recog_ports) == 0:
|
|
||||||
self.get_logger().fatal(
|
|
||||||
f"Found no recognized MCUs: {[p.device for p in recog_ports].__str__()}"
|
|
||||||
)
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
# Everything is ok
|
|
||||||
found_port = recog_ports[0]
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Selecting MCU '{found_port.description}' at {found_port.device}."
|
|
||||||
)
|
|
||||||
# String, location of device file; e.g., '/dev/ttyACM0'
|
|
||||||
self.serial_port = found_port.device
|
|
||||||
|
|
||||||
# Found a Serial port, try to open it; above code has not officially opened a Serial port
|
|
||||||
self.get_logger().debug(
|
|
||||||
f"Attempting to open Serial port '{self.serial_port}'..."
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
self.serial_interface = serial.Serial(self.serial_port, 115200, timeout=1)
|
|
||||||
|
|
||||||
# Attempt to get name of connected MCU
|
|
||||||
self.serial_interface.write(b"can_relay_mode,on\n") # can_relay_ready,[mcu]
|
|
||||||
mcu_name: str = ""
|
|
||||||
for _ in range(4): # Sometimes it takes a sec
|
|
||||||
response = self.serial_interface.read_until(bytes("\n", "utf8"))
|
|
||||||
if b"can_relay_ready" in response:
|
|
||||||
try:
|
|
||||||
args: list[str] = response.decode("utf8").strip().split(",")
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
continue # ignore malformed responses
|
|
||||||
if len(args) == 2:
|
|
||||||
mcu_name = args[1]
|
|
||||||
break
|
break
|
||||||
self.get_logger().info(
|
for port in ports:
|
||||||
f"MCU '{mcu_name}' is ready at '{self.serial_port}'."
|
try:
|
||||||
)
|
# connect and send a ping command
|
||||||
|
ser = serial.Serial(port, 115200, timeout=1)
|
||||||
|
# (f"Checking port {port}...")
|
||||||
|
ser.write(b"ping\n")
|
||||||
|
response = ser.read_until(bytes("\n", "utf8"))
|
||||||
|
|
||||||
except serial.SerialException as e:
|
# if pong is in response, then we are talking with the MCU
|
||||||
self.get_logger().fatal(
|
if b"pong" in response:
|
||||||
f"Could not open Serial port '{self.serial_port}' for reason:"
|
self.port = port
|
||||||
)
|
self.get_logger().info(f"Found MCU at {self.port}!")
|
||||||
self.get_logger().fatal(e.strerror)
|
break
|
||||||
exit(1)
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
# Close serial port on exit
|
if self.port is None:
|
||||||
|
self.get_logger().info("Unable to find MCU...")
|
||||||
|
time.sleep(1)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
self.ser = serial.Serial(self.port, 115200)
|
||||||
|
self.get_logger().info(f"Enabling Relay Mode")
|
||||||
|
self.ser.write(b"can_relay_mode,on\n")
|
||||||
atexit.register(self.cleanup)
|
atexit.register(self.cleanup)
|
||||||
|
|
||||||
##################################################
|
|
||||||
# ROS2 Topic Setup
|
|
||||||
|
|
||||||
# New pub/sub with VicCAN
|
# New pub/sub with VicCAN
|
||||||
self.fromvic_debug_pub_ = self.create_publisher(
|
self.fromvic_debug_pub_ = self.create_publisher(
|
||||||
String, "/anchor/from_vic/debug", 20
|
String, "/anchor/from_vic/debug", 20
|
||||||
@@ -168,29 +115,30 @@ class Anchor(Node):
|
|||||||
String, "/anchor/relay", self.on_relay_tovic_string, 10
|
String, "/anchor/relay", self.on_relay_tovic_string, 10
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# This thread makes all the update processes run in the background
|
||||||
|
global thread
|
||||||
|
thread = threading.Thread(target=rclpy.spin, args={self}, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while rclpy.ok():
|
||||||
|
self.read_MCU() # Check the MCU for updates
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
def read_MCU(self):
|
def read_MCU(self):
|
||||||
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
|
"""Check the USB serial port for new data from the MCU, and publish string to appropriate topics"""
|
||||||
output: str | None = None
|
|
||||||
try:
|
try:
|
||||||
output = str(self.serial_interface.readline(), "utf8")
|
output = str(self.ser.readline(), "utf8")
|
||||||
except serial.SerialException as e:
|
|
||||||
self.get_logger().fatal(f"SerialException: {e}")
|
|
||||||
exit(1)
|
|
||||||
except TypeError as e:
|
|
||||||
self.get_logger().fatal(f"TypeError: {e}")
|
|
||||||
exit(1)
|
|
||||||
except Exception as e:
|
|
||||||
self.get_logger().error(f"Exception: {e}")
|
|
||||||
|
|
||||||
if output:
|
if output:
|
||||||
self.relay_fromvic(output)
|
self.relay_fromvic(output)
|
||||||
|
# All output over debug temporarily
|
||||||
|
# self.get_logger().info(f"[MCU] {output}")
|
||||||
msg = String()
|
msg = String()
|
||||||
msg.data = output
|
msg.data = output
|
||||||
|
|
||||||
# All output over debug
|
|
||||||
self.debug_pub.publish(msg)
|
self.debug_pub.publish(msg)
|
||||||
|
|
||||||
# Send the message to the right place
|
|
||||||
if output.startswith("can_relay_fromvic,core"):
|
if output.startswith("can_relay_fromvic,core"):
|
||||||
self.core_pub.publish(msg)
|
self.core_pub.publish(msg)
|
||||||
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
|
elif output.startswith("can_relay_fromvic,arm") or output.startswith(
|
||||||
@@ -201,7 +149,28 @@ class Anchor(Node):
|
|||||||
"can_relay_fromvic,digit"
|
"can_relay_fromvic,digit"
|
||||||
): # digit for SHT sensor
|
): # digit for SHT sensor
|
||||||
self.bio_pub.publish(msg)
|
self.bio_pub.publish(msg)
|
||||||
|
# msg = String()
|
||||||
|
# msg.data = output
|
||||||
|
# self.debug_pub.publish(msg)
|
||||||
return
|
return
|
||||||
|
except serial.SerialException as e:
|
||||||
|
print(f"SerialException: {e}")
|
||||||
|
print("Closing serial port.")
|
||||||
|
if self.ser.is_open:
|
||||||
|
self.ser.close()
|
||||||
|
exit(1)
|
||||||
|
except TypeError as e:
|
||||||
|
print(f"TypeError: {e}")
|
||||||
|
print("Closing serial port.")
|
||||||
|
if self.ser.is_open:
|
||||||
|
self.ser.close()
|
||||||
|
exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Exception: {e}")
|
||||||
|
# print("Closing serial port.")
|
||||||
|
# if self.ser.is_open:
|
||||||
|
# self.ser.close()
|
||||||
|
# exit(1)
|
||||||
|
|
||||||
def on_mock_fromvic(self, msg: String):
|
def on_mock_fromvic(self, msg: String):
|
||||||
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
|
"""For testing without an actual MCU, publish strings here as if they came from an MCU"""
|
||||||
@@ -215,7 +184,7 @@ class Anchor(Node):
|
|||||||
output += f",{round(num, 7)}" # limit to 7 decimal places
|
output += f",{round(num, 7)}" # limit to 7 decimal places
|
||||||
output += "\n"
|
output += "\n"
|
||||||
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
|
# self.get_logger().info(f"VicCAN relay to MCU: {output}")
|
||||||
self.serial_interface.write(bytes(output, "utf8"))
|
self.ser.write(bytes(output, "utf8"))
|
||||||
|
|
||||||
def relay_fromvic(self, msg: str):
|
def relay_fromvic(self, msg: str):
|
||||||
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
|
"""Relay a string message from the MCU to the appropriate VicCAN topic"""
|
||||||
@@ -277,7 +246,7 @@ class Anchor(Node):
|
|||||||
"""Relay a raw string message to the MCU for debugging"""
|
"""Relay a raw string message to the MCU for debugging"""
|
||||||
message = msg.data
|
message = msg.data
|
||||||
# self.get_logger().info(f"Sending command to MCU: {msg}")
|
# self.get_logger().info(f"Sending command to MCU: {msg}")
|
||||||
self.serial_interface.write(bytes(message, "utf8"))
|
self.ser.write(bytes(message, "utf8"))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def list_serial_ports():
|
def list_serial_ports():
|
||||||
@@ -285,30 +254,29 @@ class Anchor(Node):
|
|||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
print("Cleaning up before terminating...")
|
print("Cleaning up before terminating...")
|
||||||
if self.serial_interface.is_open:
|
if self.ser.is_open:
|
||||||
self.serial_interface.close()
|
self.ser.close()
|
||||||
|
|
||||||
|
|
||||||
|
def myexcepthook(type, value, tb):
|
||||||
|
print("Uncaught exception:", type, value)
|
||||||
|
if serial_pub:
|
||||||
|
serial_pub.cleanup()
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
try:
|
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
anchor_node = Anchor()
|
sys.excepthook = myexcepthook
|
||||||
|
|
||||||
thread = threading.Thread(target=rclpy.spin, args=(anchor_node,), daemon=True)
|
global serial_pub
|
||||||
thread.start()
|
|
||||||
|
|
||||||
rate = anchor_node.create_rate(100) # 100 Hz -- arbitrary rate
|
serial_pub = SerialRelay()
|
||||||
while rclpy.ok():
|
serial_pub.run()
|
||||||
anchor_node.read_MCU() # Check the MCU for updates
|
|
||||||
rate.sleep()
|
|
||||||
except (KeyboardInterrupt, ExternalShutdownException):
|
|
||||||
print("Caught shutdown signal, shutting down...")
|
|
||||||
finally:
|
|
||||||
rclpy.try_shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
# signal.signal(signal.SIGTSTP, lambda signum, frame: sys.exit(0)) # Catch Ctrl+Z and exit cleanly
|
||||||
signal.signal(
|
signal.signal(
|
||||||
signal.SIGTERM, lambda signum, frame: exit(0)
|
signal.SIGTERM, lambda signum, frame: sys.exit(0)
|
||||||
) # Catch termination signals and exit cleanly
|
) # Catch termination signals and exit cleanly
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -1,25 +1,24 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from std_msgs.msg import String
|
from std_msgs.msg import String
|
||||||
|
|
||||||
from astra_msgs.msg import PtzControl, PtzFeedback
|
from astra_msgs.msg import PtzControl, PtzFeedback
|
||||||
|
|
||||||
# Import the SIYI SDK
|
# Import the SIYI SDK
|
||||||
from core_pkg.siyi_sdk import (
|
from core_pkg.siyi_sdk import (
|
||||||
AttitudeData,
|
|
||||||
CommandID,
|
|
||||||
DataStreamFrequency,
|
|
||||||
DataStreamType,
|
|
||||||
SingleAxis,
|
|
||||||
SiyiGimbalCamera,
|
SiyiGimbalCamera,
|
||||||
|
CommandID,
|
||||||
|
DataStreamType,
|
||||||
|
DataStreamFrequency,
|
||||||
|
SingleAxis,
|
||||||
|
AttitudeData,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -263,7 +262,7 @@ class PtzNode(Node):
|
|||||||
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
|
f"[{self.get_clock().now().nanoseconds / 1e9:.2f}] PTZ Node: {message_text}"
|
||||||
)
|
)
|
||||||
self.debug_pub.publish(msg)
|
self.debug_pub.publish(msg)
|
||||||
self.get_logger().debug(message_text)
|
self.get_logger().info(message_text)
|
||||||
|
|
||||||
def run_async_func(self, coro):
|
def run_async_func(self, coro):
|
||||||
"""Run an async function in the event loop."""
|
"""Run an async function in the event loop."""
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import rclpy
|
import rclpy
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
from rclpy.executors import ExternalShutdownException
|
|
||||||
from rclpy import qos
|
from rclpy import qos
|
||||||
from rclpy.duration import Duration
|
from rclpy.duration import Duration
|
||||||
|
|
||||||
@@ -12,8 +11,6 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import glob
|
import glob
|
||||||
import pwd
|
|
||||||
import grp
|
|
||||||
from math import copysign
|
from math import copysign
|
||||||
|
|
||||||
from std_msgs.msg import String
|
from std_msgs.msg import String
|
||||||
@@ -74,37 +71,10 @@ class Headless(Node):
|
|||||||
print("No gamepad found. Waiting...")
|
print("No gamepad found. Waiting...")
|
||||||
|
|
||||||
# Initialize the gamepad
|
# Initialize the gamepad
|
||||||
id = 0
|
self.gamepad = pygame.joystick.Joystick(0)
|
||||||
while True:
|
|
||||||
self.num_gamepads = pygame.joystick.get_count()
|
|
||||||
if id >= self.num_gamepads:
|
|
||||||
self.get_logger().fatal("Ran out of controllers to try")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.gamepad = pygame.joystick.Joystick(id)
|
|
||||||
self.gamepad.init()
|
self.gamepad.init()
|
||||||
except Exception as e:
|
|
||||||
self.get_logger().error("Error when initializing gamepad")
|
|
||||||
self.get_logger().error(e)
|
|
||||||
id += 1
|
|
||||||
continue
|
|
||||||
print(f"Gamepad Found: {self.gamepad.get_name()}")
|
print(f"Gamepad Found: {self.gamepad.get_name()}")
|
||||||
|
|
||||||
if self.gamepad.get_numhats() == 0 or self.gamepad.get_numaxes() < 5:
|
|
||||||
self.get_logger().error("Controller not correctly initialized.")
|
|
||||||
if not is_user_in_group("input"):
|
|
||||||
self.get_logger().warning(
|
|
||||||
"If using NixOS, you may need to add yourself to the 'input' group."
|
|
||||||
)
|
|
||||||
if is_user_in_group("plugdev"):
|
|
||||||
self.get_logger().warning(
|
|
||||||
"If using NixOS, you may need to remove yourself from the 'plugdev' group."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
id += 1
|
|
||||||
|
|
||||||
self.create_timer(0.15, self.send_controls)
|
self.create_timer(0.15, self.send_controls)
|
||||||
|
|
||||||
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
self.core_publisher = self.create_publisher(CoreControl, "/core/control", 2)
|
||||||
@@ -145,7 +115,7 @@ class Headless(Node):
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# Check if controller is still connected
|
# Check if controller is still connected
|
||||||
if pygame.joystick.get_count() != self.num_gamepads:
|
if pygame.joystick.get_count() == 0:
|
||||||
print("Gamepad disconnected. Exiting...")
|
print("Gamepad disconnected. Exiting...")
|
||||||
# Send one last zero control message
|
# Send one last zero control message
|
||||||
self.core_publisher.publish(CORE_STOP_MSG)
|
self.core_publisher.publish(CORE_STOP_MSG)
|
||||||
@@ -326,33 +296,10 @@ def deadzone(value: float, threshold=0.05) -> float:
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def is_user_in_group(group_name: str) -> bool:
|
|
||||||
# Copied from https://zetcode.com/python/os-getgrouplist/
|
|
||||||
try:
|
|
||||||
username = os.getlogin()
|
|
||||||
|
|
||||||
# Get group ID from name
|
|
||||||
group_info = grp.getgrnam(group_name)
|
|
||||||
target_gid = group_info.gr_gid
|
|
||||||
|
|
||||||
# Get user's groups
|
|
||||||
user_info = pwd.getpwnam(username)
|
|
||||||
user_groups = os.getgrouplist(username, user_info.pw_gid)
|
|
||||||
|
|
||||||
return target_gid in user_groups
|
|
||||||
except KeyError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
try:
|
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
|
|
||||||
node = Headless()
|
node = Headless()
|
||||||
rclpy.spin(node)
|
rclpy.spin(node)
|
||||||
except (KeyboardInterrupt, ExternalShutdownException):
|
|
||||||
print("Caught shutdown signal. Exiting...")
|
|
||||||
finally:
|
|
||||||
rclpy.shutdown()
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user