diff --git a/doc/release/master/fix_logFowarder_lost_msgs_v2.md b/doc/release/master/fix_logFowarder_lost_msgs_v2.md new file mode 100644 index 00000000000..4c81ef8e0a8 --- /dev/null +++ b/doc/release/master/fix_logFowarder_lost_msgs_v2.md @@ -0,0 +1,10 @@ +fix_logForwarder_lost_msgs_v2 {#master} +----------------------- + +## Libraries + +### `os` + +#### `Log` + +* LogForwarder: now using a separated thread (`class ThreadedPort`) to prevent the loss of log messages during stress condition \ No newline at end of file diff --git a/src/libYARP_os/src/yarp/os/impl/LogForwarder.cpp b/src/libYARP_os/src/yarp/os/impl/LogForwarder.cpp index 8d5e0c533d8..9a3d5552c5b 100644 --- a/src/libYARP_os/src/yarp/os/impl/LogForwarder.cpp +++ b/src/libYARP_os/src/yarp/os/impl/LogForwarder.cpp @@ -19,6 +19,64 @@ bool yarp::os::impl::LogForwarder::started{false}; +yarp::os::impl::ThreadedPort::ThreadedPort() : yarp::os::PeriodicThread(0.005) +{ + this->start(); +} + +void yarp::os::impl::ThreadedPort::ThreadedPort::run() +{ + size_t size=0; + do + { + mut.lock(); + size = messages.size(); + mut.unlock(); + if (size > 0 && m_port) { process(); } + else {break;} + } while (1); +} + +void yarp::os::impl::ThreadedPort::process() +{ + yarp::os::Bottle& b = m_port->prepare(); + mut.lock(); + b = messages.back(); + messages.pop_back(); + mut.unlock(); + m_port->write(true); +} + +void yarp::os::impl::ThreadedPort::attach(yarp::os::BufferedPort* port) +{ + m_port = port; +} + +void yarp::os::impl::ThreadedPort::terminate() +{ + this->stop(); + m_port = nullptr; +} + +void yarp::os::impl::ThreadedPort::insert(const yarp::os::Bottle& bot) +{ +#if 0 + //if this is enabled, the bottle is sent immediately + yarp::os::Bottle& b = m_port->prepare(); + mut.lock(); + b = bot; + mut.unlock(); + m_port->write(true); +#else + //if this is enabled, the bottle is sent in a queue and the thread will send it later + mut.lock(); + messages.push_front(bot); + mut.unlock(); +#endif +} + +///--------------------------------------------------------------------- + yarp::os::impl::LogForwarder& yarp::os::impl::LogForwarder::getInstance() { static LogForwarder instance; @@ -46,22 +104,21 @@ yarp::os::impl::LogForwarder::LogForwarder() if (!outputPort.open(logPortName)) { printf("LogForwarder error while opening port %s\n", logPortName.c_str()); } - outputPort.enableBackgroundWrite(true); + outputPort.addOutput("/yarplogger", "fast_tcp"); + tport.attach(&outputPort); started = true; } void yarp::os::impl::LogForwarder::forward(const std::string& message) { - mutex.lock(); - static Bottle b; + yarp::os::Bottle b; b.clear(); std::string port = "[" + outputPort.getName() + "]"; b.addString(port); b.addString(message); - outputPort.write(b); - mutex.unlock(); + tport.insert(b); } void yarp::os::impl::LogForwarder::shutdown() @@ -80,6 +137,8 @@ void yarp::os::impl::LogForwarder::shutdown() while (fw.outputPort.isWriting()) { yarp::os::SystemClock::delaySystem(0.2); } + + fw.tport.terminate(); fw.outputPort.interrupt(); fw.outputPort.close(); } diff --git a/src/libYARP_os/src/yarp/os/impl/LogForwarder.h b/src/libYARP_os/src/yarp/os/impl/LogForwarder.h index dd9f8ef3895..a4fbbd46a2e 100644 --- a/src/libYARP_os/src/yarp/os/impl/LogForwarder.h +++ b/src/libYARP_os/src/yarp/os/impl/LogForwarder.h @@ -9,12 +9,36 @@ #include #include +#include #include #include +#include + +#include +#include namespace yarp::os::impl { +class ThreadedPort : public yarp::os::PeriodicThread +{ + std::mutex mut; + yarp::os::BufferedPort* m_port = nullptr; + std::list messages; + + void process(); + +public: + void run() override; + void attach(yarp::os::BufferedPort* port); + void insert(const yarp::os::Bottle& bot); + void terminate(); +public: + ThreadedPort(); +}; + +//---------------------------------------------------------------- + class YARP_os_impl_API LogForwarder { public: @@ -29,8 +53,8 @@ class YARP_os_impl_API LogForwarder LogForwarder(LogForwarder const&) = delete; LogForwarder& operator=(LogForwarder const&) = delete; - std::mutex mutex; - yarp::os::Port outputPort; + ThreadedPort tport; + yarp::os::BufferedPort outputPort; static bool started; };