Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latest_time policy may in some case refrain from publishing for an aribitrarily long period #131

Open
wuchenhaogit opened this issue Jun 12, 2024 · 4 comments · May be fixed by #147
Open
Labels
help wanted Extra attention is needed

Comments

@wuchenhaogit
Copy link

The latest_time filter only publishes when the arriving message comes from the pivot sensor.
if (valid_rate && (i == find_pivot(now)) && is_full()) {
publish();
}
However, if all sensors sample at a gradually decreasing frequencies, i.e., each new message always arrives later than before, they may all fails to match the pivot (because new message arrives and decrease its frequency, pivot is thus changed). When this happens, the filter may not publishing for an extended, or even infinite period, despite all sensors functioning properly.

Here is an example. There are 2 sensors, initally with the same frequency. Starting from a time, both sensors' frequencies begin to decrease. Then pivot will change whenever a new message arrives, so that no pivot match, and thus no publishing will occur. Image below is the log of this example.

This impairs the usability of this policy. Because each time sensors decrease frequencies together, even if only for a small decreasing period in the fluctuation, the policy may refrain from publishing until the decreasing stops. The pubishing frequency is thus below the highest frequency among sensors, which does not meet the expectation for this policy.

To address this issue, it might be better to compulsely publish whenever current period has exceeded the pivot's frequency, i.e., use t_last to store last publishing time:
int p = find_pivot(now);
if (valid_rate && ( (i == p) || (now - t_last).seconds() >= 1 / rates_[p].hz) ) && is_full()) {
publish();
t_last = now;'}`
With this modification, the publishing frequency is guaranteed to approach the pivot's frequency.

@wuchenhaogit wuchenhaogit changed the title Latest_time policy may in same case not publishing for an aribitrarily long period Latest_time policy may in same case refrain from publishing for an aribitrarily long period Jun 12, 2024
@wuchenhaogit wuchenhaogit changed the title Latest_time policy may in same case refrain from publishing for an aribitrarily long period Latest_time policy may in some case refrain from publishing for an aribitrarily long period Jun 12, 2024
@wjwwood
Copy link
Member

wjwwood commented Jun 20, 2024

This was discussed in the issue triage meeting, and the conclusion was that we don't have anyone to look into this at the moment, but it would be helpful (for others that might help with this issue) if there was a simple isolated example, e.g. a bag file and some sample code to reproduce the issue. For now we will leave it here, but if you update this with more information it will be triaged again in the future.

Also as a note, please copy-paste text, rather than posting pictures of text. Thanks!

@wjwwood wjwwood added help wanted Extra attention is needed and removed help wanted Extra attention is needed labels Jun 20, 2024
@fujitatomoya
Copy link
Contributor

@wjwwood can you put help-wanted label back on? i think you accidentally removed it?

@ahcorde ahcorde added the help wanted Extra attention is needed label Jun 28, 2024
@wuchenhaogit
Copy link
Author

wuchenhaogit commented Jul 1, 2024

Thank you for paying attention to my issue. Below I provide my code that generates the result I described above.

#include <iostream>
#include <string>
#include <cfloat>
#include <chrono>
#include <memory>
#include <string>

#include <unistd.h>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "signal.h"

#include <message_filters/subscriber.h>
#include <message_filters/synchronizer.h>
#include <message_filters/sync_policies/latest_time.h>

#include <sensor_msgs/msg/joint_state.hpp>

using namespace std;

using namespace std::chrono_literals;

int test_num = 50;

class Publisher : public rclcpp::Node
{
public:
    Publisher(string topic_name, double initial_period1_, double initial_period2_, double delta_):
            Node(topic_name.c_str()), topic_name_(topic_name), initial_period1(initial_period1_), 
            initial_period2(initial_period2_), delta(delta_)
    {
        string topic_name1 = topic_name_ + "1";
        string topic_name2 = topic_name_ + "2";
        publisher1_ = this->create_publisher<sensor_msgs::msg::JointState>(topic_name1.c_str(), 10);
        publisher2_ = this->create_publisher<sensor_msgs::msg::JointState>(topic_name2.c_str(), 10);

        Thread = new std::thread(&Publisher::timer_callback,this);
    }

    void timer_callback()
    {
        rclcpp::Duration period1 = rclcpp::Duration(0, initial_period1*1e6);
        rclcpp::Duration period2 = rclcpp::Duration(0, initial_period2*1e6);

        // interleave two topics
        rclcpp::Duration initial_gap = rclcpp::Duration(0, initial_period1/2*1e6);
        
        // time of incoming messages of each channel
        rclcpp::Time next1 = this->now();
        rclcpp::Time next2 = next1 + initial_gap;

        // warm up the statistics
        cout << endl << "----- stable -----" << endl << endl;

        for(int i = 0 ; i<test_num ; ++i)
        {
            publisher1(next1);
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }


        // start to decrease the frequency
        cout << endl << "----- decreasing -----" << endl << endl;
        
        rclcpp::Duration period_increment = rclcpp::Duration(0, delta*1e6);
        for (int i = 0 ; i < test_num ; ++i) {
            publisher1(next1);
            period1 = period2 + period_increment; // new hz1 will be smaller than now hz2
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            period2 = period1 + period_increment; // new hz2 will be smaller than now hz1
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }

        
        // be stable again. publishing should be normal again.
        cout << endl << "----- stable -----" << endl << endl;
        for (int i = 0 ; i < test_num ; ++i) {
            publisher1(next1);
            next1 += period1;
            rclcpp::sleep_for(std::chrono::nanoseconds((next2 - this->now()).nanoseconds()));
            
            publisher2(next2);
            next2 += period2;
            rclcpp::sleep_for(std::chrono::nanoseconds((next1 - this->now()).nanoseconds()));
        }

        kill(getpid(),SIGINT);
    }

    void publisher1(rclcpp::Time now)
    {
        auto message = sensor_msgs::msg::JointState();
        message.header.stamp = now;
        // cout << "pub " << topic_name_ << " time " << fixed << setprecision(9) << now.seconds() << endl;
        
        publisher1_->publish(message);
    }

    void publisher2(rclcpp::Time now)
    {
        auto message = sensor_msgs::msg::JointState();
        message.header.stamp = now;
        // cout << "pub " << topic_name_ << " time " << fixed << setprecision(9) << now.seconds() << endl;
        
        publisher2_->publish(message);
    }


private:
    string topic_name_;
    std::thread* Thread;
    rclcpp::Publisher<sensor_msgs::msg::JointState>::SharedPtr publisher1_;
    rclcpp::Publisher<sensor_msgs::msg::JointState>::SharedPtr publisher2_;
    double initial_period1;
    double initial_period2;
    double delta;
};


using namespace message_filters;

using std::placeholders::_1;
using std::placeholders::_2;


class SubscriberTopic2 : public rclcpp::Node {
    public:
    SubscriberTopic2() :
        Node("subscriber_topic2")
    {
        topic1_sub_ = this->create_subscription<sensor_msgs::msg::JointState>("topic1", 300, 
                                                                        std::bind(&SubscriberTopic2::callback1, this, _1));
        topic2_sub_ = this->create_subscription<sensor_msgs::msg::JointState>("topic2", 300, 
                                                                        std::bind(&SubscriberTopic2::callback2, this, _1));

        sync_.registerCallback(std::bind(&SubscriberTopic2::callback, this, _1, _2));
    }

    ~SubscriberTopic2()
    {
    }

    private:

    void callback(const sensor_msgs::msg::JointState::ConstSharedPtr& msg1, const sensor_msgs::msg::JointState::ConstSharedPtr& msg2)
    {
        cout << "published at " << fixed << setprecision(9) << this->now().seconds() << endl;
        cout << "\tmsg1 " << fixed << setprecision(9) << (double)msg1->header.stamp.sec + 1e-9*(double)msg1->header.stamp.nanosec << endl;
        cout << "\tmsg2 " << fixed << setprecision(9) << (double)msg2->header.stamp.sec + 1e-9*(double)msg2->header.stamp.nanosec << endl;
    }

    void callback1(sensor_msgs::msg::JointState::SharedPtr msg)
    {
        cout << "sub1 " << fixed << setprecision(9) << (double)msg->header.stamp.sec + 1e-9*(double)msg->header.stamp.nanosec << endl;
        
        sync_.add<0>(msg);
    }

    void callback2(sensor_msgs::msg::JointState::SharedPtr msg)
    {
        cout << "sub2 " << fixed << setprecision(9) << (double)msg->header.stamp.sec + 1e-9*(double)msg->header.stamp.nanosec << endl;
        
        sync_.add<1>(msg);
    }


    rclcpp::CallbackGroup::SharedPtr callback_group_subscriber_;

    rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr topic1_sub_;
    rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr topic2_sub_;

    typedef Synchronizer<sync_policies::LatestTime<sensor_msgs::msg::JointState, sensor_msgs::msg::JointState> > Sync;

    Sync sync_;
};

using namespace std;


int main(int argc, char * argv[])
{
    double ini1;    // initial period of channel 1 (in ms)
    double ini2;    // initial period of channel 2 (in ms)
    double delta;   // increment of period
    if(argc == 1){ // default (my previous example)
        test_num = 50;       // number of messages in decreasing & stable pattern
        ini1 = 20;
        ini2 = 20;
        delta = 0.2667;
    }
    else {
        assert(argc == 5);
        test_num = atoi(argv[1]);
        ini1 = atof(argv[2]);
        ini2 = atof(argv[3]); 
        delta = atof(argv[4]);
    }

    rclcpp::init(argc, argv);
    rclcpp::executors::SingleThreadedExecutor executor;
    // rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(),50);

    auto sub_node = std::make_shared<SubscriberTopic2>();
    executor.add_node(sub_node);

    string topic_name = "topic";
    
    auto pub_node = std::make_shared<Publisher>(topic_name, ini1, ini2, delta);
    executor.add_node(pub_node);

    executor.spin();

    executor.remove_node(sub_node);
    executor.remove_node(pub_node);

    rclcpp::shutdown();
    return 0;
}

@nicolaloi
Copy link

nicolaloi commented Aug 17, 2024

@wjwwood @fujitatomoya I have reproduced the issue, and I would like to try to fix it if no one else is already looking at it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants