diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 266e671a5..b3da1a657 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -25,7 +25,7 @@ jobs: linters=(cppcheck cpplint flake8 pep257 uncrustify xmllint) packages=( diagnostic_aggregator diagnostic_common_diagnostics - diagnostic_remote_logging diagnostic_updater self_test + diagnostic_remote_logging diagnostic_topic_monitor diagnostic_updater self_test ) sudo apt-get install -y "${linters[@]/#/ros-jazzy-ament-}" failed=() diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 26d762c2e..456883848 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -62,6 +62,7 @@ jobs: diagnostic_aggregator diagnostic_common_diagnostics diagnostic_remote_logging + diagnostic_topic_monitor diagnostic_updater self_test build_win: diff --git a/README.md b/README.md index e284e09ec..f9428e1e2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Test diagnostics](https://img.shields.io/github/actions/workflow/status/ros/diagnostics/test.yaml?label=test&style=flat-square)](https://github.com/ros/diagnostics/actions/workflows/test.yaml) [![Lint diagnostics](https://img.shields.io/github/actions/workflow/status/ros/diagnostics/lint.yaml?label=lint&style=flat-square)](https://github.com/ros/diagnostics/actions/workflows/lint.yaml) [![ROS2 Humble](https://img.shields.io/ros/v/humble/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#humble) [![ROS2 Jazzy](https://img.shields.io/ros/v/jazzy/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#jazzy) [![ROS2 Kilted](https://img.shields.io/ros/v/kilted/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#kilted) [![ROS2 Rolling](https://img.shields.io/ros/v/rolling/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#rolling) +[![Test diagnostics](https://img.shields.io/github/actions/workflow/status/ros/diagnostics/test.yaml?label=test&style=flat-square)](https://github.com/ros/diagnostics/actions/workflows/test.yaml) [![Lint diagnostics](https://img.shields.io/github/actions/workflow/status/ros/diagnostics/lint.yaml?label=lint&style=flat-square)](https://github.com/ros/diagnostics/actions/workflows/lint.yaml) [![ROS2 Humble](https://img.shields.io/ros/v/humble/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#humble) [![ROS2 Jazzy](https://img.shields.io/ros/v/jazzy/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#jazzy) [![ROS2 Kilted](https://img.shields.io/ros/v/kilted/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#kilted) [![ROS2 Rolling](https://img.shields.io/ros/v/rolling/diagnostics.svg?style=flat-square)](https://index.ros.org/r/diagnostics/#rolling) # Overview @@ -14,6 +14,8 @@ It contains the following packages: - [`diagnostic_analysis`](/diagnostics/): *Not ported to ROS2 yet* **#contributions-welcome** - [`diagnostic_common_diagnostics`](/diagnostic_common_diagnostics/): Predefined nodes for monitoring the Linux and ROS system. - [`diagnostic_updater`](/diagnostic_updater/): Base classes to publishing custom diagnostic messages for Python and C++. +- [`diagnostic_remote_logging`](/diagnostic_remote_logging/): Utilities for forwarding diagnostics to remote systems, e.g. influxdb. +- [`diagnostic_topic_monitor`](/diagnostic_topic_monitor/): Components for monitoring topic health and publishing diagnostics. - [`self_test`](/self_test/): Tools to perform self tests on nodes. ## Collecting diagnostic data @@ -45,16 +47,6 @@ New features are to be developed in custom branches and then merged into the `ro From there, the changes are backported to the other branches. -## Backport Tooling - -This tool has proven to be useful: [backport](https://www.npmjs.com/package/backport) - -Use this command to port a given PR of `PR_NUMBER` to the other branches: - -```bash -backport --pr PR_NUMBER -b ros2-humble ros2-jazzy ros2-kilted -``` - ## Versioning and Releases - (**X**.0.0) We use the major version number to indicate a breaking change. diff --git a/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py b/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py index 28430c482..fb99d4002 100644 --- a/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py +++ b/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py @@ -63,7 +63,7 @@ def setUp(self): def diagnostics_callback(self, msg): self.message_recieved = True - self.assertEqual(len(msg.status), 1) + self.assertGreaterEqual(len(msg.status), 1) def test_ok(self): warning_percentage = 100 diff --git a/diagnostic_common_diagnostics/test/systemtest/test_hd_monitor_launchtest.py b/diagnostic_common_diagnostics/test/systemtest/test_hd_monitor_launchtest.py index 0f34c4467..75baf7c82 100644 --- a/diagnostic_common_diagnostics/test/systemtest/test_hd_monitor_launchtest.py +++ b/diagnostic_common_diagnostics/test/systemtest/test_hd_monitor_launchtest.py @@ -67,7 +67,7 @@ def generate_test_description(): class TestHDMonitor(unittest.TestCase): - """Test if the hd_monitor node is publishing diagnostics.""" + """Test the hd_monitor node.""" def __init__(self, methodName: str = 'runTest') -> None: super().__init__(methodName) @@ -83,11 +83,11 @@ def _get_min_level(self): for status in diag.status ] if len(levels) == 0: - return -1 + return None return min(levels) def test_topic_published(self): - """Test if the hd_monitor node is publishing diagnostics.""" + """Test if we receive diagnostics messages.""" min_level = 100 with WaitForTopics([('/diagnostics', DiagnosticArray)], timeout=5): print('Topic found') @@ -98,9 +98,15 @@ def test_topic_published(self): DiagnosticArray, '/diagnostics', self._received_message, 1 ) + while len(self.received_messages) == 0: + rclpy.spin_once(test_node, timeout_sec=1) + print('Waiting for first message...') + while len(self.received_messages) < 10: rclpy.spin_once(test_node, timeout_sec=1) - min_level = min(min_level, self._get_min_level()) + received_min_level = self._get_min_level() + assert received_min_level is not None, 'The received message should have a level' + min_level = min(min_level, received_min_level) if min_level == 0: break diff --git a/diagnostic_topic_monitor/.gitignore b/diagnostic_topic_monitor/.gitignore new file mode 100644 index 000000000..cc4087c20 --- /dev/null +++ b/diagnostic_topic_monitor/.gitignore @@ -0,0 +1,2 @@ +.pytest_cache +**/__pycache__ \ No newline at end of file diff --git a/diagnostic_topic_monitor/CMakeLists.txt b/diagnostic_topic_monitor/CMakeLists.txt new file mode 100644 index 000000000..e20275cb7 --- /dev/null +++ b/diagnostic_topic_monitor/CMakeLists.txt @@ -0,0 +1,108 @@ +cmake_minimum_required(VERSION 3.8) +project(diagnostic_topic_monitor) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + +find_package(ament_cmake_python REQUIRED) + +find_package(rclpy REQUIRED) +find_package(diagnostic_msgs REQUIRED) +find_package(diagnostic_updater REQUIRED) +find_package(rclcpp_lifecycle REQUIRED) +find_package(rclcpp_components REQUIRED) +find_package(lifecycle_msgs REQUIRED) +find_package(rclcpp REQUIRED) +find_package(diagnostic_updater REQUIRED) +find_package(std_msgs REQUIRED) + +add_library(topic_frequency_monitor_component SHARED src/topic_frequency_monitor_component.cpp) +rclcpp_components_register_node(topic_frequency_monitor_component + PLUGIN "diagnostic_topic_monitor::TopicFrequencyMonitor" + EXECUTABLE topic_frequency_monitor +) +target_include_directories(topic_frequency_monitor_component PRIVATE include) +target_link_libraries(topic_frequency_monitor_component PUBLIC + ${diagnostic_msgs_TARGETS} + ${lifecycle_msgs_TARGETS} + diagnostic_updater::diagnostic_updater + rclcpp::rclcpp + rclcpp_components::component + rclcpp_components::component_manager + rclcpp_lifecycle::rclcpp_lifecycle +) +if(WIN32) + target_link_libraries(topic_frequency_monitor_component PRIVATE ws2_32) +endif() +ament_export_targets(export_topic_frequency_monitor_component) +install(TARGETS topic_frequency_monitor_component + EXPORT export_topic_frequency_monitor_component + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + +add_library(topic_age_monitor_component SHARED src/topic_age_monitor_component.cpp) +rclcpp_components_register_node(topic_age_monitor_component + PLUGIN "diagnostic_topic_monitor::TopicAgeMonitor" + EXECUTABLE topic_age_monitor +) +target_include_directories(topic_age_monitor_component PRIVATE include) +target_link_libraries(topic_age_monitor_component PUBLIC + ${diagnostic_msgs_TARGETS} + ${lifecycle_msgs_TARGETS} + diagnostic_updater::diagnostic_updater + rclcpp::rclcpp + rclcpp_components::component + rclcpp_components::component_manager + rclcpp_lifecycle::rclcpp_lifecycle +) +if(WIN32) + target_link_libraries(topic_age_monitor_component PRIVATE ws2_32) +endif() +ament_export_targets(export_topic_age_monitor_component) +install(TARGETS topic_age_monitor_component + EXPORT export_topic_age_monitor_component + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + +install( + DIRECTORY include/ + DESTINATION include +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + find_package(launch_testing_ament_cmake REQUIRED) + + # Install the test configs and dummy publisher used for testing + install( + DIRECTORY test + DESTINATION share/${PROJECT_NAME}/ + ) + install( + PROGRAMS test/dummy_publishers.py + DESTINATION lib/${PROJECT_NAME} + ) + + add_launch_test( + test/test_topic_frequency_monitor.py + TARGET test_topic_frequency_monitor + TIMEOUT 20) + + add_launch_test( + test/test_topic_age_monitor.py + TARGET test_topic_age_monitor + TIMEOUT 20) +endif() + +ament_package() diff --git a/diagnostic_topic_monitor/README.md b/diagnostic_topic_monitor/README.md new file mode 100644 index 000000000..4a4070d9e --- /dev/null +++ b/diagnostic_topic_monitor/README.md @@ -0,0 +1,107 @@ +# Diagnostic Topic Monitor + +Offers diagnostic tools which monitor topics for health, such as frequency and age of messages. The output can be further processed with the [`diagnostic_aggregator`](https://github.com/ros/diagnostics/tree/ros2/diagnostic_aggregator) for cleaner observation. + +## Monitors + +There are two monitors currently available, which can be used as components or standalone nodes: + * topic_frequency_monitor -- this supports _any_ kind of topic and checks for frequency based on receive time + * topic_age_monitor -- this supports only topics with a Header, and it checks that message age is within a given range + +It is totally possible to run both of these at the same time, for different or the same topics, to check for both frequency +and max message age at the same time. + +## Usage + +You typically add the monitor(s) you want to a launch file in your package and configure the parameters to your liking. However, the nodes and components are ROS 2 life-cycle based, which means you need to configure and activate them manually. + +See the launch tests in the `test` folder for examples of how start the standalone monitor nodes, and example configs in `test/config`. + +> TIP: If you like the brevity and don't mind the added dependency, you can use the [nav2_lifecycle_manager](https://docs.nav2.org/configuration/packages/configuring-lifecycle.html) with parameter `autostart` set to `true` and target your monitor nodes to autostart them. + +> Example launch snippet: +```python +# Include this node in your LaunchConfiguration +autostart_monitors_node = LifecycleNode( + package="nav2_lifecycle_manager", + executable="lifecycle_manager", + name="autostart_monitors", + parameters=[ + { + "autostart": True, + "node_names": [], + "bond_timeout": 0.0, + } + ], + ) +``` + +> For this, you need to have the `nav2_lifecycle_manager` package available (binary or source code installation), and ideally add it as an `exec_depend` in your package. + +### Limiting which topics to monitor + +By default, the following internal topic names will be ignored by the monitors, as they are usually irrelevant: + +* ^/rosout$ +* .*/parameter_events +* ^/diagnostics$ +* .*/transition_event + +ROS2 parameter `monitor_configured_only` for both monitors: + +If false (the default), the topic monitor will monitor everything but the internal topics mentioned above. +To instead only monitor those topics which have been explicitly configured, set this to "true" + +### Influencing the output + +ROS2 parameter `diag_prefix` for both the monitors: + +This string will be prefixed to the name of the diagnostic for use within the aggregator. + +## `topic_frequency_monitor` + +Without any parameters, this node (component also available) will subscribe to all normal topics in a system and minimally monitor that there is activity on them (defined as "at least one message per second"). For more specific configuration, the following parameters are available. + +### Frequency Monitoring + +Usually, we want to make sure that the frequency of a topic is within certain bounds. + +For example, to monitor that the frequency of the "talker" topic is about 25Hz and the frequency of the "cmd_vel" topic is about 10Hz, use the following three parameters and pass them lists in the same order. This means, the second element in "min_values" applies to the topic given by the second element of the "topics" parameter. + + topics: [ "talker", "cmd_vel" ] + min_values: [ 23.0, 8.0 ] + max_values: [ 26.5, 11.0 ] + +Please note that the frequencies will be computed purely based on _arrival_ timing, the header is currently ignored. + +## `topic_age_monitor` + +Without any parameters, this node (component also available) will subscribe to all normal topics in a system and minimally monitor that there is activity on them (defined as "at least one message per second"). For more specific configuration, the following parameters are available. + +### Message Age Monitoring + +Usually, we want to make sure that message is not much older than expected (allowing for some jitter due to network transmission), +but also not newer than expected (which would indicate issues with timestamp or system time between machines). + +Since this relies on a sender-side timestamp, this monitor only works with message that include a Header. + +NOTE: You can configure it for any topic and it _will_ interpret the data in there as a timestamp, but unless it +really is a header, the results will be rather random ;-) + +Configuration is similar to the frequency monitor, but giving a range for delays instead: + + topics: [ "talker", "cmd_vel" ] + min_values: [ 80, 8.0 ] + max_values: [ 120, 11.0 ] + monitor_configured_only: True + +> It is highly prudent to only monitor configured topics in the age monitor. If you set it to monitor all, +and you have topics without header fields, you will get a lot of warning messages!! + +## Testing + +To run all tests in this package: + +```bash +colcon test --event-handlers console_direct+ --packages-select diagnostic_topic_monitor && colcon test-result +``` diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp new file mode 100644 index 000000000..d453eedd4 --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp @@ -0,0 +1,65 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR__ACTIVITY_DIAGNOSTIC_TASK_HPP_ +#define DIAGNOSTIC_TOPIC_MONITOR__ACTIVITY_DIAGNOSTIC_TASK_HPP_ + +#include +#include + +#include // NOLINT: upstream + +using namespace std::chrono_literals; // NOLINT: build/namespaces + +namespace diagnostic_topic_monitor +{ + +/** + * @brief Very simply DiagnosticTask that just checks whether there has been _any_ + * kind of activity during the last period + */ +class ActivityDiagnosticTask : public diagnostic_updater::DiagnosticTask +{ +public: + ActivityDiagnosticTask( + const std::string & name, rclcpp::Clock::SharedPtr clock, + std::chrono::duration warning_threshold = 1.1s) + : diagnostic_updater::DiagnosticTask(name), + clock_(clock), + last_tick_(0), + warning_threshold_(warning_threshold) + { + } + virtual void tick() {last_tick_ = clock_->now();} + void run(diagnostic_updater::DiagnosticStatusWrapper & stat) override + { + stat.name = getName(); + if (last_tick_.seconds() == 0) { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::WARN; + stat.message = "No data received, yet"; + stat.addf("period", "%f", std::numeric_limits::max()); + } else { + const auto currentTime = clock_->now(); + // more than 1s without tick is considered warning + const auto elapsed = currentTime - last_tick_; + stat.addf("period", "%f", elapsed.seconds()); + if (elapsed <= rclcpp::Duration(warning_threshold_)) { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::OK; + stat.message = "OK"; + } else { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::WARN; + stat.message = "Last data received more than 1s ago"; + } + } + } + +private: + rclcpp::Clock::SharedPtr clock_; + rclcpp::Time last_tick_; + std::chrono::duration warning_threshold_; +}; + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR__ACTIVITY_DIAGNOSTIC_TASK_HPP_ diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp new file mode 100644 index 000000000..01bf4b756 --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp @@ -0,0 +1,273 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR__GENERIC_TOPIC_MONITOR_HPP_ +#define DIAGNOSTIC_TOPIC_MONITOR__GENERIC_TOPIC_MONITOR_HPP_ + +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +// Windows SDK defines ERROR as 0, which breaks scoped enum usage (e.g. CallbackReturn::ERROR) +#ifdef ERROR +#undef ERROR +#endif +#else +#include +#endif + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include // NOLINT: upstream +#include // NOLINT: upstream + +#include "diagnostic_msgs/msg/diagnostic_array.hpp" // NOLINT: upstream +#include "diagnostic_topic_monitor/activity_diagnostic_task.hpp" +#include "diagnostic_updater/publisher.hpp" // NOLINT: upstream +#include "diagnostic_updater/update_functions.hpp" // NOLINT: upstream + +using namespace std::chrono_literals; // NOLINT: build/namespaces + +namespace diagnostic_topic_monitor +{ +constexpr const char * TOPICS_PARAM_NAME = "topics"; +constexpr const char * MIN_VALUES_PARAM_NAME = "min_values"; +constexpr const char * MAX_VALUES_PARAM_NAME = "max_values"; +constexpr const char * MONITOR_CONFIGURED_ONLY_PARAM_NAME = + "monitor_configured_only"; +constexpr const char * DIAG_PREFIX_PARAM_NAME = "diag_prefix"; +typedef rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface:: + CallbackReturn LCCBReturn; + +/** + * @brief Base abstract class for creating Topic Monitors. + */ +template +class GenericTopicMonitor : public rclcpp_lifecycle::LifecycleNode +{ +public: + explicit GenericTopicMonitor( + const std::string & node_name, + rclcpp::NodeOptions options); + ~GenericTopicMonitor() {} + LCCBReturn on_configure(const rclcpp_lifecycle::State &) override; + LCCBReturn on_activate(const rclcpp_lifecycle::State & state) override; + LCCBReturn on_deactivate(const rclcpp_lifecycle::State & state) override; + LCCBReturn on_cleanup(const rclcpp_lifecycle::State &) override + { + return LCCBReturn::SUCCESS; + } + LCCBReturn on_shutdown(const rclcpp_lifecycle::State &) override + { + return LCCBReturn::SUCCESS; + } + +protected: + std::string get_prefixed_name(const std::string & topic_name) const; + bool skip_topic(const std::string & topic_name); + virtual void topic_cb( + const std::string & topic_name, + const std::shared_ptr & msg) = 0; + virtual void update_topic_subscriptions(); + virtual StatusParamType parse_params(const int index) = 0; + + std::vector hidden_topics_{ + std::regex("^/rosout$"), std::regex(".*/parameter_events"), std::regex( + "^/diagnostics$"), + std::regex(".*/transition_event"), std::regex("^/clock")}; + std::shared_ptr timer_; + std::shared_ptr updater_; + std::unordered_map> subscribed_topics_; + std::unordered_map> topic_diag_map_; + std::vector topics_; + std::set known_topics_; + std::vector min_values_; + std::vector max_values_; + bool monitor_configured_only_{true}; + std::string diag_prefix_; +}; + +// Implementation of GenericTopicMonitor methods +template +inline GenericTopicMonitor::GenericTopicMonitor( + const std::string & node_name, rclcpp::NodeOptions options) +: rclcpp_lifecycle::LifecycleNode(node_name, options.allow_undeclared_parameters( + true)) +{ + auto desc = rcl_interfaces::msg::ParameterDescriptor{}; + desc.description = "Topics to specify expectations for"; + desc.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; + declare_parameter(TOPICS_PARAM_NAME, std::vector(), desc); + auto desc_min = rcl_interfaces::msg::ParameterDescriptor{}; + desc_min.description = "Minimum values for topic from 'topics'"; + desc_min.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE_ARRAY; + declare_parameter(MIN_VALUES_PARAM_NAME, std::vector(), desc_min); + auto desc_max = rcl_interfaces::msg::ParameterDescriptor{}; + desc_max.description = "Maximum values for topic from 'topics'"; + desc_max.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE_ARRAY; + declare_parameter(MAX_VALUES_PARAM_NAME, std::vector(), desc_max); + // by default, we include all topics + declare_parameter(MONITOR_CONFIGURED_ONLY_PARAM_NAME, false); + // prefix for diagnostics (for use with modules) + declare_parameter(DIAG_PREFIX_PARAM_NAME, ""); +} + +template +inline std::string +diagnostic_topic_monitor::GenericTopicMonitor::get_prefixed_name( + const std::string & topic_name) const +{ + if (topic_name.find(diag_prefix_) == 0) { + return topic_name; + } + return diag_prefix_ + std::string("/") + topic_name; +} + +template +inline bool diagnostic_topic_monitor::GenericTopicMonitor::skip_topic( + const std::string & topic_name) +{ + // skip if we already subscribed + if (subscribed_topics_.find(topic_name) != subscribed_topics_.end()) { + RCLCPP_DEBUG( + get_logger(), "Already subscribed to %s, skipping", topic_name.c_str()); + return true; + } + // if it's configured, we monitor it + if (std::find(topics_.begin(), topics_.end(), topic_name) != topics_.end()) { + RCLCPP_DEBUG( + get_logger(), "Topic %s is configured, no skip", topic_name.c_str()); + return false; + } + // skip if we have seen this before and ignored it + if (known_topics_.find(topic_name) != known_topics_.end()) { + return true; + } + // check for matches against internal topic names + for (const auto & re : hidden_topics_) { + if (std::regex_match(topic_name, re)) { + known_topics_.insert(topic_name); + RCLCPP_DEBUG( + get_logger(), "Topic %s matches ignore list, skip it", + topic_name.c_str()); + return true; + } + } + return monitor_configured_only_; +} + +template +inline void diagnostic_topic_monitor::GenericTopicMonitor< + StatusType, StatusParamType>::update_topic_subscriptions() +{ + RCLCPP_DEBUG( + get_logger(), "Examining topic list for changes, currently monitoring %ld topics", + subscribed_topics_.size()); + const auto topics = this->get_topic_names_and_types(); + for (const auto & e : topics) { + const std::string topic_name = e.first; + if (skip_topic(topic_name)) { + continue; + } + // subscribe with statistics enabled + RCLCPP_DEBUG( + get_logger(), "Starting to monitor topic %s", + topic_name.c_str()); + std::shared_ptr sub = + this->create_generic_subscription( + e.first, *e.second.begin(), rclcpp::QoS(10), + [this, topic_name](std::shared_ptr msg) { + this->topic_cb(topic_name, msg); + }); + subscribed_topics_[topic_name] = sub; + } +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_configure( + const rclcpp_lifecycle::State &) +{ + RCLCPP_DEBUG(get_logger(), "Configuring"); + // configure the diagnostics + topics_ = get_parameter(TOPICS_PARAM_NAME).as_string_array(); + min_values_ = get_parameter(MIN_VALUES_PARAM_NAME).as_double_array(); + max_values_ = get_parameter(MAX_VALUES_PARAM_NAME).as_double_array(); + if (topics_.size() != min_values_.size() || + topics_.size() != max_values_.size()) + { + throw std::invalid_argument( + "Topics and min/max_values must have same number of arguments"); + } + monitor_configured_only_ = + get_parameter(MONITOR_CONFIGURED_ONLY_PARAM_NAME).as_bool(); + diag_prefix_ = get_parameter(DIAG_PREFIX_PARAM_NAME).as_string(); + + RCLCPP_DEBUG( + get_logger(), "Done configuring for %ld topics, config only: %d", + topic_diag_map_.size(), + monitor_configured_only_); + return LCCBReturn::SUCCESS; +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_activate( + const rclcpp_lifecycle::State &) +{ + RCLCPP_DEBUG(get_logger(), "Activating"); + updater_ = std::make_shared(this); + char HOSTNAME[1000]; + gethostname(HOSTNAME, 1000); + updater_->setHardwareID(std::string(HOSTNAME)); + for (size_t i = 0; i < topics_.size(); ++i) { + auto param = parse_params(static_cast(i)); + auto diag = std::make_shared( + param, get_prefixed_name( + topics_[i]), get_clock()); + topic_diag_map_[topics_[i]] = diag; + updater_->add(*diag); + } + // check existing topics + try { + update_topic_subscriptions(); + } catch (const std::exception & ex) { + RCLCPP_ERROR( + this->get_logger(), "Failure to update subscriptions: %s", ex.what()); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface:: + CallbackReturn::ERROR; + } + timer_ = + create_wall_timer(1s, [this]() {this->update_topic_subscriptions();}); + return LCCBReturn::SUCCESS; +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_deactivate( + const rclcpp_lifecycle::State & state) +{ + (void)state; + RCLCPP_INFO_STREAM(get_logger(), "Deactivating GenericTopicMonitor"); + return LCCBReturn::SUCCESS; +} + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR__GENERIC_TOPIC_MONITOR_HPP_ diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp new file mode 100644 index 000000000..b90d81d3a --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp @@ -0,0 +1,39 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR__TOPIC_AGE_MONITOR_HPP_ +#define DIAGNOSTIC_TOPIC_MONITOR__TOPIC_AGE_MONITOR_HPP_ + +#include +#include +#include + +#include "diagnostic_topic_monitor/generic_topic_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +class TopicAgeMonitor final + : public GenericTopicMonitor< + diagnostic_updater::TimeStampStatus, + diagnostic_updater::TimeStampStatusParam> +{ +public: + TopicAgeMonitor(const std::string & node_name, rclcpp::NodeOptions options); + TopicAgeMonitor(rclcpp::NodeOptions options = rclcpp::NodeOptions()); + ~TopicAgeMonitor() override = default; + + void topic_cb( + const std::string & topic_name, + const std::shared_ptr & msg) override; + diagnostic_updater::TimeStampStatusParam parse_params(const int index) + override; + +private: + rclcpp::Serialization _header_serializer; +}; + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR__TOPIC_AGE_MONITOR_HPP_ diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp new file mode 100644 index 000000000..0284e7673 --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp @@ -0,0 +1,43 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR__TOPIC_FREQUENCY_MONITOR_HPP_ +#define DIAGNOSTIC_TOPIC_MONITOR__TOPIC_FREQUENCY_MONITOR_HPP_ + +#include +#include +#include + +#include "diagnostic_topic_monitor/generic_topic_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +class TopicFrequencyMonitor final + : public GenericTopicMonitor< + diagnostic_updater::FrequencyStatus, + diagnostic_updater::FrequencyStatusParam> +{ +public: + TopicFrequencyMonitor( + const std::string & node_name, + rclcpp::NodeOptions options); + TopicFrequencyMonitor(rclcpp::NodeOptions options = rclcpp::NodeOptions()); + ~TopicFrequencyMonitor() override = default; + + void topic_cb( + const std::string & topic_name, + const std::shared_ptr & msg) override; + diagnostic_updater::FrequencyStatusParam parse_params(const int index) + override; + void update_topic_subscriptions() override; + +private: + std::unordered_map> fallback_topic_diag_map_; +}; + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR__TOPIC_FREQUENCY_MONITOR_HPP_ diff --git a/diagnostic_topic_monitor/package.xml b/diagnostic_topic_monitor/package.xml new file mode 100644 index 000000000..bb4bf4f73 --- /dev/null +++ b/diagnostic_topic_monitor/package.xml @@ -0,0 +1,37 @@ + + + + diagnostic_topic_monitor + 1.0.0 + Generic Topic Monitor + Ingo Lütkebohle + Christian Henkel + + BSD-3-Clause + + Ingo Lütkebohle + Tejas Kumar Shastha + + ament_cmake + + diagnostic_msgs + diagnostic_updater + rclcpp_components + rclcpp_lifecycle + rclcpp + rclpy + statistics_msgs + std_msgs + + python3-ntplib + + ament_cmake_pytest + ament_lint_auto + ament_lint_common + launch_testing + sensor_msgs + + + ament_cmake + + diff --git a/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp b/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp new file mode 100644 index 000000000..8bfdfbf10 --- /dev/null +++ b/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp @@ -0,0 +1,49 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#include "diagnostic_topic_monitor/topic_age_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +TopicAgeMonitor::TopicAgeMonitor(const std::string & node_name, rclcpp::NodeOptions options) +: GenericTopicMonitor(node_name, options) +{ +} + +TopicAgeMonitor::TopicAgeMonitor(rclcpp::NodeOptions options) +: GenericTopicMonitor("topic_age_monitor", options) +{ +} + +diagnostic_updater::TimeStampStatusParam diagnostic_topic_monitor::TopicAgeMonitor::parse_params( + const int index) +{ + return diagnostic_updater::TimeStampStatusParam(min_values_[index], max_values_[index]); +} + +void TopicAgeMonitor::topic_cb( + const std::string & topic_name, const std::shared_ptr & msg) +{ + std_msgs::msg::Header header; + const auto diag = topic_diag_map_.find(topic_name); + try { + _header_serializer.deserialize_message(msg.get(), &header); + } catch (const std::exception & e) { + RCLCPP_WARN( + get_logger(), "Failed to deserialize message from topic %s, skipping it", topic_name.c_str()); + return; + } + + if (diag != topic_diag_map_.end()) { + diag->second->tick(header.stamp); + } else { + RCLCPP_WARN(get_logger(), "No diagnostic found for topic %s", topic_name.c_str()); + } +} + +} // namespace diagnostic_topic_monitor + +#include // NOLINT: upstream +RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_topic_monitor::TopicAgeMonitor) diff --git a/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp b/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp new file mode 100644 index 000000000..4295c7725 --- /dev/null +++ b/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp @@ -0,0 +1,78 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#include "diagnostic_topic_monitor/topic_frequency_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +TopicFrequencyMonitor::TopicFrequencyMonitor( + const std::string & node_name, rclcpp::NodeOptions options) +: GenericTopicMonitor(node_name, options) +{ +} + +TopicFrequencyMonitor::TopicFrequencyMonitor(rclcpp::NodeOptions options) +: GenericTopicMonitor("topic_frequency_monitor", options) +{ +} + +diagnostic_updater::FrequencyStatusParam +diagnostic_topic_monitor::TopicFrequencyMonitor::parse_params(const int index) +{ + return diagnostic_updater::FrequencyStatusParam(&min_values_[index], &max_values_[index]); +} + +void TopicFrequencyMonitor::topic_cb( + const std::string & topic_name, const std::shared_ptr &) +{ + const auto diag = topic_diag_map_.find(topic_name); + if (diag != topic_diag_map_.end()) { + diag->second->tick(); + } else { + const auto fallback_diag = fallback_topic_diag_map_.find(topic_name); + if (fallback_diag != fallback_topic_diag_map_.end()) { + fallback_diag->second->tick(); + } else { + RCLCPP_WARN(get_logger(), "No diagnostic found for topic %s", topic_name.c_str()); + } + } +} + +void diagnostic_topic_monitor::TopicFrequencyMonitor::update_topic_subscriptions() +{ + RCLCPP_DEBUG( + get_logger(), "Examining topic list for changes, currently monitoring %ld topics", + subscribed_topics_.size()); + const auto topics = this->get_topic_names_and_types(); + for (const auto & e : topics) { + const std::string topic_name = e.first; + if (skip_topic(topic_name)) { + continue; + } + // subscribe with statistics enabled + RCLCPP_DEBUG(get_logger(), "Starting to monitor topic %s", topic_name.c_str()); + std::shared_ptr sub = this->create_generic_subscription( + e.first, *e.second.begin(), rclcpp::QoS(10), + [this, topic_name](std::shared_ptr msg) { + this->topic_cb(topic_name, msg); + }); + subscribed_topics_[topic_name] = sub; + // add a default diagnostic if none is configured + if (topic_diag_map_.find(topic_name) == topic_diag_map_.end()) { + auto fallback_diag = + std::make_shared(get_prefixed_name(topic_name), get_clock()); + fallback_topic_diag_map_[topic_name] = fallback_diag; + updater_->add(*fallback_diag); + } + } + RCLCPP_DEBUG( + get_logger(), "Done updating topic list. Found %ld, hidden %ld, now monitoring %ld", + topics.size(), known_topics_.size(), subscribed_topics_.size()); +} + +} // namespace diagnostic_topic_monitor + +#include // NOLINT: upstream +RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_topic_monitor::TopicFrequencyMonitor) diff --git a/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml b/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml new file mode 100644 index 000000000..850c1fbc0 --- /dev/null +++ b/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml @@ -0,0 +1,9 @@ +/**: + ros__parameters: + monitor_configured_only: True + topics: + - /dummy_header_topic + min_values: + - -0.01 + max_values: + - 0.2 diff --git a/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml b/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml new file mode 100644 index 000000000..85a234095 --- /dev/null +++ b/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml @@ -0,0 +1,18 @@ +/**: + ros__parameters: + monitor_configured_only: True + topics: + - /dummy_string_topic1 + - /dummy_string_topic2 + - /dummy_string_topic3 + - /dummy_header_topic + min_values: + - 0.5 + - 0.5 + - 0.5 + - 0.5 + max_values: + - 1.5 + - 1.5 + - 1.5 + - 1.5 diff --git a/diagnostic_topic_monitor/test/dummy_publishers.py b/diagnostic_topic_monitor/test/dummy_publishers.py new file mode 100755 index 000000000..72f399db5 --- /dev/null +++ b/diagnostic_topic_monitor/test/dummy_publishers.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import CameraInfo +from std_msgs.msg import String + + +class DummyPublisherNode(Node): + + def __init__(self): + super().__init__('dummy_publisher_node') + + # Publishers + self.string_pub1 = self.create_publisher(String, '/dummy_string_topic1', 10) + self.string_pub2 = self.create_publisher(String, '/dummy_string_topic2', 10) + self.string_pub3 = self.create_publisher(String, '/dummy_string_topic3', 10) + # For testing the age_monitor, we need any topic with a header + self.header_pub = self.create_publisher(CameraInfo, '/dummy_header_topic', 10) + + # Timer to publish at 1Hz + self.timer = self.create_timer(1.0, self.publish_messages) + + self.get_logger().info('DummyPublisherNode has been started.') + + def publish_messages(self): + # Publish to /dummy_string_topic1 + msg1 = String() + msg1.data = 'Hello from dummy_string_topic1' + self.string_pub1.publish(msg1) + + # Publish to /dummy_string_topic2 + msg2 = String() + msg2.data = 'Hello from dummy_string_topic2' + self.string_pub2.publish(msg2) + + # Publish to /dummy_string_topic3 + msg3 = String() + msg3.data = 'Hello from dummy_string_topic3' + self.string_pub3.publish(msg3) + + # Publish to /dummy_header_topic + header_msg = CameraInfo() + header_msg.header.stamp = self.get_clock().now().to_msg() + header_msg.header.frame_id = 'dummy_frame' + self.header_pub.publish(header_msg) + + self.get_logger().debug('Published dummy messages to all topics.') + + +def main(args=None): + rclpy.init(args=args) + node = DummyPublisherNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + node.get_logger().info('Shutting down DummyPublisherNode...') + + +if __name__ == '__main__': + main() diff --git a/diagnostic_topic_monitor/test/test_topic_age_monitor.py b/diagnostic_topic_monitor/test/test_topic_age_monitor.py new file mode 100755 index 000000000..4625e30fd --- /dev/null +++ b/diagnostic_topic_monitor/test/test_topic_age_monitor.py @@ -0,0 +1,207 @@ +#! /usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import time +import unittest + +from diagnostic_msgs.msg import DiagnosticArray +from diagnostic_msgs.msg import DiagnosticStatus +import launch +from launch.actions import EmitEvent +from launch.actions import LogInfo +from launch.actions import RegisterEventHandler +from launch.actions import SetEnvironmentVariable +import launch.event_handlers.on_process_start +from launch.substitutions import PathJoinSubstitution +from launch_ros.actions import LifecycleNode +from launch_ros.actions import Node +import launch_ros.events.lifecycle +from launch_ros.events.lifecycle import ChangeState +from launch_ros.substitutions import FindPackageShare +import launch_testing +import launch_testing.actions +import launch_testing.asserts +from lifecycle_msgs.msg import Transition +import pytest +import rclpy +from rclpy.time import Duration +from rclpy.time import Time + + +def create_change_state(target, target_state): + """Activate the lifecycle monitor nodes.""" + return EmitEvent( + event=ChangeState( + lifecycle_node_matcher=launch.events.matches_action(target), + transition_id=target_state, + ) + ) + + +def create_register_configure(target_action): + return RegisterEventHandler( + launch.event_handlers.on_process_start.OnProcessStart( + target_action=target_action, + on_start=[ + LogInfo(msg=f'Emitting configure event for {target_action}'), + create_change_state( + target_action, Transition.TRANSITION_CONFIGURE + ) + ], + ) + ) + + +def create_register_activate(target_action): + return RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=target_action, + start_state='configuring', + goal_state='inactive', + entities=[ + LogInfo(msg=f'Emitting activate event for {target_action}'), + create_change_state( + target_action, Transition.TRANSITION_ACTIVATE + ) + ], + ) + ) + + +CONFIG_MONITOR_NAME = 'monitor_configured_topics_node' + + +@pytest.mark.launch_test +def generate_test_description(): + # Node that publishes the topics we want to monitor + talker_node = Node( + package='diagnostic_topic_monitor', + executable='dummy_publishers.py', + output='log', + name='talker', + arguments=['--ros-args', '--log-level', 'talker:=warn'], + ) + # Monitor with configuration file + monitor_config_node = LifecycleNode( + package='diagnostic_topic_monitor', + executable='topic_age_monitor', + name=CONFIG_MONITOR_NAME, + output='both', + namespace='', + parameters=[ + PathJoinSubstitution( + [ + FindPackageShare('diagnostic_topic_monitor'), + 'test', + 'config', + 'topic_age_monitor.yaml', + ] + ), + ], + arguments=['--ros-args', '--log-level', 'monitor_configured_topics_node:=INFO'], + ) + + return launch.LaunchDescription( + [ + SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'), + talker_node, + monitor_config_node, + # Right after the monitor starts, make it take the 'configure' transition. + create_register_configure(monitor_config_node), + # When the monitor reaches the 'inactive' state, 'activate'. + create_register_activate(monitor_config_node), + # When the monitor node reaches the 'active' state, we're ready for testing + RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=monitor_config_node, + start_state='activating', + goal_state='active', + entities=[ + LogInfo(msg='Monitor reached active state'), + launch_testing.actions.ReadyToTest(), + ], + ) + ), + ] + ) + + +class TestMonitor(unittest.TestCase): + TIMEOUT = 30 + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node('test_topic_age_monitor_node') + self.log = self.node.get_logger() + self.sub = self.node.create_subscription( + DiagnosticArray, '/diagnostics', self.stat_cb, 10 + ) + self.pub_count = self.node.count_publishers('/diagnostics') + self.log.info( + f'Number of publishers for /diagnostics: {self.pub_count}. Listening for messages...' + ) + self.age_messages = [] # Store messages from configured age monitor + + start_time = time.time() + while len(self.age_messages) < 3: + rclpy.spin_once(self.node, timeout_sec=1.0) + self.log.debug(f'Got {len(self.age_messages)} age msgs msgs') + if (time.time() - start_time) > self.TIMEOUT: + self.fail('Timed out waiting for message in /diagnostics topic') + self.log.info(f'Stored {len(self.age_messages)} age msgs') + + def tearDown(self): + self.node.destroy_node() + del self.node + + def stat_cb(self, msg): + """Store message for future processing.""" + if len(msg.status) == 0: + return + if CONFIG_MONITOR_NAME in msg.status[0].name: + self.age_messages.append(msg) + + def test_diag_msg(self): + """Check that diagnostics messages contain the right content.""" + last_msg = self.age_messages.pop() + # header + current_time = self.node.get_clock().now() + header_time = Time.from_msg(last_msg.header.stamp) + self.assertLess(current_time - header_time, Duration(seconds=0.1)) + last_status = last_msg.status[0] + # status should be OK + self.assertEqual(last_status.level, DiagnosticStatus.OK) + + def test_age_diag_msg(self): + """Check that the age diagnostic works.""" + last_msg = self.age_messages.pop() + self.assertTrue(len(last_msg.status) > 0) + status = last_msg.status[0] + # check some fields for present/content + self.assertTrue(CONFIG_MONITOR_NAME in status.name) + keys = [kv.key for kv in status.values] + self.assertTrue('Earliest timestamp delay:' in keys) + + def test_ignore_unconfigured(self): + """Check that we ignore the topic we don't monitor.""" + last_msg = self.age_messages.pop() + self.assertEqual(len(last_msg.status), 1) # We monitor 1 topic only + names = [status.name for status in last_msg.status] + # This topic should be monitored + self.assertIn(f'{CONFIG_MONITOR_NAME}: /dummy_header_topic', names, f'{names}') + # This topic should not be monitored + self.assertNotIn( + f'{CONFIG_MONITOR_NAME}: /dummy_string_topic1', + names, + f'{names}', + ) diff --git a/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py b/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py new file mode 100755 index 000000000..d7cc7d056 --- /dev/null +++ b/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py @@ -0,0 +1,221 @@ +#! /usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import time +import unittest + +from diagnostic_msgs.msg import DiagnosticArray +from diagnostic_msgs.msg import DiagnosticStatus +import launch +from launch.actions import EmitEvent +from launch.actions import LogInfo +from launch.actions import RegisterEventHandler +from launch.actions import SetEnvironmentVariable +import launch.event_handlers.on_process_start +from launch.substitutions import PathJoinSubstitution +from launch_ros.actions import LifecycleNode +from launch_ros.actions import Node +import launch_ros.events.lifecycle +from launch_ros.events.lifecycle import ChangeState +from launch_ros.substitutions import FindPackageShare +import launch_testing +import launch_testing.actions +import launch_testing.asserts +from lifecycle_msgs.msg import Transition +import pytest +import rclpy +from rclpy.time import Duration +from rclpy.time import Time + + +def create_change_state(target, target_state): + """Activate the lifecycle monitor nodes.""" + return EmitEvent( + event=ChangeState( + lifecycle_node_matcher=launch.events.matches_action(target), + transition_id=target_state, + ) + ) + + +def create_register_configure(target_action): + return RegisterEventHandler( + launch.event_handlers.on_process_start.OnProcessStart( + target_action=target_action, + on_start=[ + LogInfo(msg=f'Emitting configure event for {target_action}'), + create_change_state( + target_action, Transition.TRANSITION_CONFIGURE + ) + ], + ) + ) + + +def create_register_activate(target_action): + return RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=target_action, + start_state='configuring', + goal_state='inactive', + entities=[ + LogInfo(msg=f'Emitting activate event for {target_action}'), + create_change_state( + target_action, Transition.TRANSITION_ACTIVATE + ) + ], + ) + ) + + +ALL_MONITOR_NAME = 'monitor_all_topics_node' +CONFIG_MONITOR_NAME = 'monitor_configured_topics_node' + + +@pytest.mark.launch_test +def generate_test_description(): + # Node that publishes the topics we want to monitor + talker_node = Node( + package='diagnostic_topic_monitor', + executable='dummy_publishers.py', + output='log', + name='talker', + arguments=['--ros-args', '--log-level', 'talker:=warn'], + ) + # Un-configured frequency monitor that checks all topics + monitor_all_node = LifecycleNode( + package='diagnostic_topic_monitor', + executable='topic_frequency_monitor', + name=ALL_MONITOR_NAME, + output='both', + namespace='', + arguments=['--ros-args', '--log-level', 'all_monitor:=INFO'], + ) + # Frequency monitor with configuration file + monitor_config_node = LifecycleNode( + package='diagnostic_topic_monitor', + executable='topic_frequency_monitor', + name=CONFIG_MONITOR_NAME, + output='both', + namespace='', + parameters=[ + PathJoinSubstitution( + [ + FindPackageShare('diagnostic_topic_monitor'), + 'test', + 'config', + 'topic_frequency_monitor.yaml', + ] + ), + ], + arguments=['--ros-args', '--log-level', 'monitor_configured_topics_node:=INFO'], + ) + + return launch.LaunchDescription( + [ + SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'), + talker_node, + monitor_all_node, + monitor_config_node, + # Right after the monitor starts, make it take the 'configure' transition. + create_register_configure(monitor_all_node), + create_register_configure(monitor_config_node), + # When the monitor reaches the 'inactive' state, 'activate'. + create_register_activate(monitor_all_node), + create_register_activate(monitor_config_node), + # When the monitor node reaches the 'active' state, we're ready for testing + RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=monitor_all_node, + start_state='activating', + goal_state='active', + entities=[ + LogInfo(msg='Monitor reached active state'), + launch_testing.actions.ReadyToTest(), + ], + ) + ), + ] + ) + + +class TestMonitor(unittest.TestCase): + TIMEOUT = 30 + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node('test_topic_frequency_monitor_node') + self.log = self.node.get_logger() + self.sub = self.node.create_subscription( + DiagnosticArray, '/diagnostics', self.stat_cb, 10 + ) + self.pub_count = self.node.count_publishers('/diagnostics') + self.log.info( + f'Number of publishers for /diagnostics: {self.pub_count}. Listening for messages...' + ) + self.messages = [] + self.freq_messages = [] + + start_time = time.time() + while len(self.messages) < 5 or len(self.freq_messages) < 5: + rclpy.spin_once(self.node, timeout_sec=1.0) + self.log.debug(f'Got {len(self.messages)} and {len(self.freq_messages)}') + if (time.time() - start_time) > self.TIMEOUT: + self.fail('Timed out waiting for message in /diagnostics topic') + self.log.debug(f'Got {len(self.messages)} and {len(self.freq_messages)}') + + def tearDown(self): + self.node.destroy_node() + del self.node + + def stat_cb(self, msg): + """Store message for future processing.""" + if len(msg.status) == 0: + return + if CONFIG_MONITOR_NAME in msg.status[0].name: + self.freq_messages.append(msg) + else: + self.messages.append(msg) + + def test_diag_msg(self): + """Check that diagnostics messages contain the right content.""" + last_msg = self.messages.pop() + # header + current_time = self.node.get_clock().now() + header_time = Time.from_msg(last_msg.header.stamp) + self.assertLess(current_time - header_time, Duration(seconds=1.0)) + last_status = last_msg.status[0] + # status should be OK + self.assertEqual(last_status.level, DiagnosticStatus.OK) + keys = [value.key for value in last_status.values] + self.assertTrue('period' in keys) + names = [status.name for status in last_msg.status] + # The all_topics monitor should have all topics + self.assertIn(f'{ALL_MONITOR_NAME}: /dummy_header_topic', names, f'{names}') + self.assertIn(f'{ALL_MONITOR_NAME}: /dummy_string_topic1', names, f'{names}') + + def test_frequency_diag_msg(self): + """Check that the frequency diagnostic works.""" + last_msg = self.freq_messages.pop() + self.log.debug(f'{last_msg}') + self.assertTrue(len(last_msg.status) > 0) + status = last_msg.status[0] + # check some fields for present/content + self.assertTrue(CONFIG_MONITOR_NAME in status.name) + keys = [kv.key for kv in status.values] + self.assertIn('Actual frequency (Hz)', keys) + + def test_ignore_unconfigured(self): + """Check that we ignore the topic we don't monitor.""" + last_msg = self.freq_messages.pop() + self.assertEqual(len(last_msg.status), 4) # We monitor 4 topics