Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logger lost messages V2 #2789

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/release/master/fix_logFowarder_lost_msgs_v2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
fix_logForwarder_lost_msgs_v2 {#master}
-----------------------

## Libraries

### `os`

#### `Log`

* LogForwarder: now using a separated thread (`class ThreadedPort`) to prevent the loss of log messages during stress condition
69 changes: 64 additions & 5 deletions src/libYARP_os/src/yarp/os/impl/LogForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,64 @@

bool yarp::os::impl::LogForwarder::started{false};

yarp::os::impl::ThreadedPort::ThreadedPort() : yarp::os::PeriodicThread(0.005)
{
this->start();
}

void yarp::os::impl::ThreadedPort::ThreadedPort::run()
{
size_t size=0;
do
{
mut.lock();
size = messages.size();
mut.unlock();
if (size > 0 && m_port) { process(); }
else {break;}
} while (1);
}

void yarp::os::impl::ThreadedPort::process()
{
yarp::os::Bottle& b = m_port->prepare();
mut.lock();
b = messages.back();
messages.pop_back();
mut.unlock();
m_port->write(true);
}

void yarp::os::impl::ThreadedPort::attach(yarp::os::BufferedPort<yarp::os::Bottle>* port)
{
m_port = port;
}

void yarp::os::impl::ThreadedPort::terminate()
{
this->stop();
m_port = nullptr;
}

void yarp::os::impl::ThreadedPort::insert(const yarp::os::Bottle& bot)
{
#if 0
//if this is enabled, the bottle is sent immediately
yarp::os::Bottle& b = m_port->prepare();
mut.lock();
b = bot;
mut.unlock();
m_port->write(true);
#else
//if this is enabled, the bottle is sent in a queue and the thread will send it later
mut.lock();
messages.push_front(bot);
mut.unlock();
#endif
}

///---------------------------------------------------------------------

yarp::os::impl::LogForwarder& yarp::os::impl::LogForwarder::getInstance()
{
static LogForwarder instance;
Expand Down Expand Up @@ -46,22 +104,21 @@ yarp::os::impl::LogForwarder::LogForwarder()
if (!outputPort.open(logPortName)) {
printf("LogForwarder error while opening port %s\n", logPortName.c_str());
}
outputPort.enableBackgroundWrite(true);

outputPort.addOutput("/yarplogger", "fast_tcp");
tport.attach(&outputPort);

started = true;
}

void yarp::os::impl::LogForwarder::forward(const std::string& message)
{
mutex.lock();
static Bottle b;
yarp::os::Bottle b;
b.clear();
std::string port = "[" + outputPort.getName() + "]";
b.addString(port);
b.addString(message);
outputPort.write(b);
mutex.unlock();
tport.insert(b);
}

void yarp::os::impl::LogForwarder::shutdown()
Expand All @@ -80,6 +137,8 @@ void yarp::os::impl::LogForwarder::shutdown()
while (fw.outputPort.isWriting()) {
yarp::os::SystemClock::delaySystem(0.2);
}

fw.tport.terminate();
fw.outputPort.interrupt();
fw.outputPort.close();
}
Expand Down
28 changes: 26 additions & 2 deletions src/libYARP_os/src/yarp/os/impl/LogForwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,36 @@
#include <yarp/os/api.h>

#include <yarp/os/Port.h>
#include <yarp/os/PortWriterBuffer.h>

#include <mutex>
#include <string>
#include <yarp/os/PeriodicThread.h>

#include <yarp/os/BufferedPort.h>
#include <list>

namespace yarp::os::impl {

class ThreadedPort : public yarp::os::PeriodicThread
{
std::mutex mut;
yarp::os::BufferedPort<yarp::os::Bottle>* m_port = nullptr;
std::list<yarp::os::Bottle> messages;

void process();

public:
void run() override;
void attach(yarp::os::BufferedPort<yarp::os::Bottle>* port);
void insert(const yarp::os::Bottle& bot);
void terminate();
public:
ThreadedPort();
};

//----------------------------------------------------------------

class YARP_os_impl_API LogForwarder
{
public:
Expand All @@ -29,8 +53,8 @@ class YARP_os_impl_API LogForwarder
LogForwarder(LogForwarder const&) = delete;
LogForwarder& operator=(LogForwarder const&) = delete;

std::mutex mutex;
yarp::os::Port outputPort;
ThreadedPort tport;
yarp::os::BufferedPort<yarp::os::Bottle> outputPort;
static bool started;
};

Expand Down