From ffb1be0e13047ea7413c54c7f044c4bc71f8f10d Mon Sep 17 00:00:00 2001 From: Alois Klink Date: Fri, 25 Aug 2023 13:21:56 +0100 Subject: [PATCH] wip Potential issues, maybe calling `->stop()` on the loop after a file has been opened, but before the `->read()` has completed, isn't defined???? Current error: 0x00007ffff71403cf in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so (gdb) bt #0 0x00007ffff71403cf in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #1 0x00007ffff712e7f7 in uv_run () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #2 0x00007ffff7116d59 in ?? () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #3 0x00007ffff7117ecb in std::_Function_handler (), std::__future_base::_Task_setter, std::__future_base::_Result_base::_Deleter>, std::thread::_Invoker >, void> >::_M_invoke(std::_Any_data const&) () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #4 0x00007ffff7111b3d in std::__future_base::_State_baseV2::_M_do_set(std::function ()>*, bool*) () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #5 0x00007ffff7ce2f68 in __pthread_once_slow (once_control=0x555555cf5ff8, init_routine=0x7ffff6f6fd50 <__once_proxy>) at ./nptl/pthread_once.c:116 #6 0x00007ffff7115dd2 in std::__future_base::_Async_state_impl >, void>::_M_run() () from /home/alois/Documents/nqminds/nqm-irimager/.venv/lib/python3.10/site-packages/nqm/irimager.cpython-310-x86_64-linux-gnu.so #7 0x00007ffff6f71253 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6 #8 0x00007ffff7cddb43 in start_thread (arg=) at ./nptl/pthread_create.c:442 #9 0x00007ffff7d6fa00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 --- CMakeLists.txt | 11 ++ src/nqm/irimager/irlogger_to_spd.cpp | 265 +++++++++++++++------------ 2 files changed, 161 insertions(+), 115 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 320b4db..b0a527a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,6 +92,17 @@ target_link_libraries(irimager_class spdlog::spdlog_header_only # less efficient, but avoids CXX11 ABI issues ) +set(MSAN TRUE) +if(ASAN) + target_compile_options(irimager INTERFACE $<$:-fsanitize=address,undefined -fno-omit-frame-pointer -fno-common>) + target_link_libraries(irimager INTERFACE $<$:-fsanitize=address,undefined>) +endif() + +if(MSAN) + target_compile_options(irimager INTERFACE $<$:-fno-omit-frame-pointer -fsanitize=memory>) + target_link_libraries(irimager INTERFACE $<$:-fno-omit-frame-pointer -fsanitize=memory>) +endif() + if(IRImager_mock) target_compile_definitions(irimager_class PRIVATE IR_IMAGER_MOCK) else() diff --git a/src/nqm/irimager/irlogger_to_spd.cpp b/src/nqm/irimager/irlogger_to_spd.cpp index f39dbf6..d103fe8 100644 --- a/src/nqm/irimager/irlogger_to_spd.cpp +++ b/src/nqm/irimager/irlogger_to_spd.cpp @@ -8,6 +8,11 @@ #include #include +#define uvw_error_event_handler \ + ([](const uvw::error_event &error, auto &) { \ + spdlog::error("uvw error {}", error.what()); \ + }) + class IRLoggerToSpd::impl { public: impl(const std::filesystem::path &socket_path) { start_thread(socket_path); } @@ -29,116 +34,146 @@ class IRLoggerToSpd::impl { [this, socket_path // capture by value, since this will run in another thread ] { - spdlog::debug("started in new thread"); + spdlog::debug("started in new thread"); - auto loop = uvw::loop::get_default(); + auto loop = uvw::loop::get_default(); - // TODO: do we need to check for nullptr? - this->message = loop->resource(); + // TODO: do we need to check for nullptr? + this->message = loop->resource(); - spdlog::debug("checking for early stop"); + spdlog::debug("checking for early stop"); - if (this->stop) { // in case calling thread stopped this before - // `message` was created - spdlog::info("early exit!"); - loop->stop(); - } + if (this->stop) { // in case calling thread stopped this before + // `message` was created + spdlog::info("early exit!"); + loop->stop(); + } - this->message->on( - [this, &loop](const uvw::async_event &, uvw::async_handle &) { - if (this->stop) { - loop->stop(); - } - }); + this->message->on(uvw_error_event_handler); + this->message->on( + [this, &loop](const uvw::async_event &, uvw::async_handle &) { + if (this->stop) { + loop->stop(); + } + }); - spdlog::debug("message handler added"); + spdlog::debug("message handler added"); #ifdef FIFO_STREAMING_WORKKING - auto log_file_reader = loop->resource(); - - log_file_reader->on( - [](const uvw::connect_event &, uvw::pipe_handle &pipe_handle) { - spdlog::debug("Connection"); - pipe_handle.on( - [](const uvw::end_event &, uvw::pipe_handle &srv) { - srv.close(); - }); - pipe_handle.on( - [](const uvw::data_event &data, uvw::pipe_handle &) { - auto string_data = - std::string_view(data.data.get(), data.length); - - // TODO PARSE LOGS - spdlog::info(string_data); - }); - pipe_handle.read(); - }); - - log_file_reader->on( - [&loop](const uvw::error_event &error, uvw::pipe_handle &) { - spdlog::error("log_file_reader error {}", error.what()); - }); - - spdlog::debug("Connecting to socket {}", socket_path.string()); - log_file_reader->connect(socket_path.string()); - log_file_reader->read(); + auto log_file_reader = loop->resource(); + + log_file_reader->on( + [](const uvw::connect_event &, uvw::pipe_handle &pipe_handle) { + spdlog::debug("Connection"); + pipe_handle.on( + [](const uvw::end_event &, uvw::pipe_handle &srv) { + srv.close(); + }); + pipe_handle.on([](const uvw::data_event &data, + uvw::pipe_handle &) { + auto string_data = std::string_view(data.data.get(), data.length); + + // TODO PARSE LOGS + spdlog::info(string_data); + }); + pipe_handle.read(); + }); + + log_file_reader->on(uvw_error_event_handler); + + spdlog::debug("Connecting to socket {}", socket_path.string()); + log_file_reader->connect(socket_path.string()); + log_file_reader->read(); #endif #define UVW_FILE_STREAMING 1 #ifdef UVW_FILE_STREAMING - /** - * This currently works, but it's very inefficient, since it maxes - * out the CPU to 100%! - */ - auto log_file_reader = loop->resource(); - - size_t file_offset = 0; - log_file_reader->on([&](const auto &event, auto &req) { - switch (event.type) { - case uvw::fs_req::fs_type::READ: { - if (event.result != 0) { - auto string_data = - std::string_view(event.read.data.get(), event.result); - file_offset += event.result; - - // TODO: PARSE LOGS somehow? - spdlog::info(string_data); - } - - [[fallthrough]]; - } - case uvw::fs_req::fs_type::OPEN: - req.read(static_cast(file_offset), 16); - break; - default: - spdlog::error( - "Unexpected fs_req type {}", - static_cast>( - event.type)); - [[fallthrough]]; - case uvw::fs_req::fs_type::CLOSE: - spdlog::debug("Closing file {}", socket_path.string()); - log_file_reader->close(); + /** + * This currently works, but it's very inefficient, since it maxes + * out the CPU to 100%! + */ + auto log_file_reader = loop->resource(); + + size_t file_offset = 0; + constexpr unsigned int MAX_READ_SIZE = 512; + bool reading = false; + + log_file_reader->on(uvw_error_event_handler); + log_file_reader->on([&](const auto &event, auto &req) { + spdlog::debug("Got fs_event"); + switch (event.type) { + case uvw::fs_req::fs_type::READ: { + if (event.result == 0) { + spdlog::trace("Empty read, stop reading"); + // empty read, stop reading until `log_file_watcher` restarts + // reading + reading = false; + break; } - }); + spdlog::trace("Got {} bytes of data", event.result); + auto string_data = + std::string_view(event.read.data.get(), event.result); + file_offset += event.result; + + // TODO: PARSE LOGS somehow? + spdlog::info(string_data); + [[fallthrough]]; // read the next bit of data + } + case uvw::fs_req::fs_type::OPEN: + reading = true; + req.read(static_cast(file_offset), MAX_READ_SIZE); + break; + default: + spdlog::error( + "Unexpected fs_req type {}", + static_cast>( + event.type)); + [[fallthrough]]; + case uvw::fs_req::fs_type::CLOSE: + spdlog::debug("Closing file {}", socket_path.string()); + } + + spdlog::debug("Reading from file at {}", socket_path.string()); + log_file_reader->open(socket_path.string(), + uvw::file_req::file_open_flags::RDONLY, 0644); + +#if 1 + auto log_file_watcher = loop->resource(); + log_file_watcher->on(uvw_error_event_handler); + log_file_watcher->on([&](const auto &, auto &) { + // may error if log_file_reader isn't yet open, but not a big deal + spdlog::debug("Got fs_event_event on file {}", socket_path.string()); + if (!reading) { + reading = true; + log_file_reader->read(static_cast(file_offset), + MAX_READ_SIZE); + } + }); + spdlog::debug("Watching file at {}", socket_path.string()); + log_file_watcher->start(socket_path.string()); - spdlog::debug("Reading from file at {}", socket_path.string()); - log_file_reader->open(socket_path.string(), - uvw::file_req::file_open_flags::RDONLY, 0644); +#endif #endif // POSIX hack - spdlog::debug("running loop in another thread"); - loop->run(); + spdlog::debug("running loop in another thread"); + loop->run(); + + // TODO: is this line needed?? + // If it is, should we use RAII to handle exceptions? + loop->walk([](auto &&h) { + spdlog::debug("Closing handle"); + h.close(); }); + }); } void stop_thread() { - stop = true; - if (message != nullptr) { - message->send(); - } + stop = true; + if (message != nullptr) { + message->send(); + } - logger_thread.get(); + logger_thread.get(); } std::shared_future logger_thread; @@ -147,35 +182,35 @@ class IRLoggerToSpd::impl { std::shared_ptr message; /** If `true`, stop the logger thread when a @p message is received */ std::atomic stop = false; -}; - -/** - * @see IRLoggerToSpd::IRLoggerToSpd() documentation. - */ -static std::filesystem::path default_socket_path() { - const char *xdg_runtime_dir_ptr = std::getenv("XDG_RUNTIME_DIR"); - if (xdg_runtime_dir_ptr) { - auto xdg_runtime_dir = - std::filesystem::path(xdg_runtime_dir_ptr) / "nqm-irimager"; - try { - std::filesystem::create_directory(xdg_runtime_dir); - return xdg_runtime_dir / "irlogger.fifo"; - } catch (const std::filesystem::filesystem_error &e) { - // on no_such_file_or_directory error, just use temp_directory_path() - if (e.code() != std::errc::no_such_file_or_directory) { - throw e; + }; + + /** + * @see IRLoggerToSpd::IRLoggerToSpd() documentation. + */ + static std::filesystem::path default_socket_path() { + const char *xdg_runtime_dir_ptr = std::getenv("XDG_RUNTIME_DIR"); + if (xdg_runtime_dir_ptr) { + auto xdg_runtime_dir = + std::filesystem::path(xdg_runtime_dir_ptr) / "nqm-irimager"; + try { + std::filesystem::create_directory(xdg_runtime_dir); + return xdg_runtime_dir / "irlogger.fifo"; + } catch (const std::filesystem::filesystem_error &e) { + // on no_such_file_or_directory error, just use temp_directory_path() + if (e.code() != std::errc::no_such_file_or_directory) { + throw e; + } } } - } - auto temp_dir = std::filesystem::temp_directory_path() / "nqm-irimager"; - std::filesystem::create_directory(temp_dir); - return temp_dir / "irlogger.fifo"; -} + auto temp_dir = std::filesystem::temp_directory_path() / "nqm-irimager"; + std::filesystem::create_directory(temp_dir); + return temp_dir / "irlogger.fifo"; + } -IRLoggerToSpd::IRLoggerToSpd() : IRLoggerToSpd(default_socket_path()) {} + IRLoggerToSpd::IRLoggerToSpd() : IRLoggerToSpd(default_socket_path()) {} -IRLoggerToSpd::IRLoggerToSpd(const std::filesystem::path &socket_path) - : pImpl{std::make_unique(socket_path)} {} + IRLoggerToSpd::IRLoggerToSpd(const std::filesystem::path &socket_path) + : pImpl{std::make_unique(socket_path)} {} -IRLoggerToSpd::~IRLoggerToSpd() {} + IRLoggerToSpd::~IRLoggerToSpd() {}