diff --git a/GALERA_VERSION b/GALERA_VERSION index 11dd2cfa9..7ddbd84da 100644 --- a/GALERA_VERSION +++ b/GALERA_VERSION @@ -1,4 +1,4 @@ GALERA_VERSION_WSREP_API=26 GALERA_VERSION_MAJOR=4 -GALERA_VERSION_MINOR=14 +GALERA_VERSION_MINOR=16 GALERA_VERSION_EXTRA= diff --git a/SConstruct b/SConstruct index 317a7fbf3..966880cdf 100644 --- a/SConstruct +++ b/SConstruct @@ -170,7 +170,7 @@ static_ssl = ARGUMENTS.get('static_ssl', None) install = ARGUMENTS.get('install', None) version_script = int(ARGUMENTS.get('version_script', 1)) -GALERA_VER = ARGUMENTS.get('version', '4.14') +GALERA_VER = ARGUMENTS.get('version', '4.16') GALERA_REV = ARGUMENTS.get('revno', 'XXXX') # Attempt to read from file if not given diff --git a/debian/changelog b/debian/changelog index 4dfb146db..a4836fa87 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,5 +1,5 @@ -galera-4 (26.4.14) UNRELEASED; urgency=medium +galera-4 (26.4.16) UNRELEASED; urgency=medium * Galera 4 release - -- Codership Oy Thu, 26 Jan 2023 09:31:00 +0200 + -- Codership Oy Mon, 14 Aug 2023 15:02:38 +0300 diff --git a/galera/src/SConscript b/galera/src/SConscript index df8cb0a9f..56462af2a 100644 --- a/galera/src/SConscript +++ b/galera/src/SConscript @@ -33,7 +33,6 @@ libgaleraxx_srcs = [ 'ist.cpp', 'gcs_dummy.cpp', 'saved_state.cpp', - 'event_service.cpp', 'galera_view.cpp' ] diff --git a/galera/src/certification.cpp b/galera/src/certification.cpp index c7e17f640..da97527cc 100644 --- a/galera/src/certification.cpp +++ b/galera/src/certification.cpp @@ -1,5 +1,5 @@ // -// Copyright (C) 2010-2019 Codership Oy +// Copyright (C) 2010-2023 Codership Oy // #include "certification.hpp" @@ -70,6 +70,45 @@ length_check(const gu::Config& conf) return gu::Config::from_config(CERT_PARAM_LENGTH_CHECK_DEFAULT); } +static void +report_stale_entry(const galera::Certification::CertIndexNG::value_type& ke, + const galera::KeySetIn& key_set) +{ + std::cerr << "Found stale entry for key: " << ke->key() << "\n"; + key_set.rewind(); + std::cerr << "Key set\n"; + for (long i = 0; i < key_set.count(); ++i) + { + const auto& kp = key_set.next(); + std::cerr << kp << "\n"; + } +} + +// Verify that there are no stale entries of ts left after index purge. +// Is stale entry is found, the corresponding key and the key set is +// printed into stderr. Debug build will assert. +// +// This method requires iterating over whole index, so it is relatively +// expensive, and should be used only for debugging purposes. +static void +check_purge_complete(const galera::Certification::CertIndexNG& cert_index, + const galera::TrxHandleSlave* ts, + const galera::KeySetIn& key_set) +{ + std::for_each( + cert_index.begin(), cert_index.end(), + [&cert_index, &key_set, + ts](const galera::Certification::CertIndexNG::value_type& ke) { + ke->for_each_ref([&ke, &key_set, ts](const TrxHandleSlave* ref) { + if (ts == ref) + { + report_stale_entry(ke, key_set); + } + assert(ts != ref); + }); + }); +} + // Purge key set from given index static void purge_key_set(galera::Certification::CertIndexNG& cert_index, galera::TrxHandleSlave* ts, @@ -102,6 +141,10 @@ static void purge_key_set(galera::Certification::CertIndexNG& cert_index, } } } + if (cert_debug_on) + { + check_purge_complete(cert_index, ts, key_set); + } } void @@ -246,36 +289,26 @@ certify_and_depend_v3to5(const galera::KeyEntryNG* const found, /* returns true on collision, false otherwise */ static bool -certify_v3to5(galera::Certification::CertIndexNG& cert_index_ng, +certify_v3to5(const galera::Certification::CertIndexNG& cert_index_ng, const galera::KeySet::KeyPart& key, galera::TrxHandleSlave* const trx, - bool const store_keys, bool const log_conflicts) { galera::KeyEntryNG ke(key); - galera::Certification::CertIndexNG::iterator ci(cert_index_ng.find(&ke)); + galera::Certification::CertIndexNG::const_iterator ci(cert_index_ng.find(&ke)); if (cert_index_ng.end() == ci) { - if (store_keys) - { - galera::KeyEntryNG* const kep(new galera::KeyEntryNG(ke)); - ci = cert_index_ng.insert(kep).first; - - cert_debug << "created new entry"; - } - return false; + return false; // No match } - else - { - cert_debug << "found existing entry"; - galera::KeyEntryNG* const kep(*ci); - // Note: For we skip certification for isolated trxs, only - // cert index and key_list is populated. - return (!trx->is_toi() && - certify_and_depend_v3to5(kep, key, trx, log_conflicts)); - } + cert_debug << "found existing entry"; + + galera::KeyEntryNG* const kep(*ci); + // Note: For we skip certification for isolated trxs, only + // cert index and key_list is populated. + return (!trx->is_toi() && + certify_and_depend_v3to5(kep, key, trx, log_conflicts)); } // Add key to trx references for trx that passed certification. @@ -298,62 +331,16 @@ static void do_ref_keys(galera::Certification::CertIndexNG& cert_index, if (ci == cert_index.end()) { - gu_throw_fatal << "could not find key '" << k - << "' from cert index"; + galera::KeyEntryNG* const kep(new galera::KeyEntryNG(ke)); + ci = cert_index.insert(kep).first; + cert_debug << "created new entry"; } (*ci)->ref(k.wsrep_type(trx->version()), k, trx); } } -// Clean up keys from index that were added by trx that failed -// certification. -// -// @param cert_index certification inde -// @param key_set key_set used in certification -// @param processed number of keys that were processed in certification -static void do_clean_keys(galera::Certification::CertIndexNG& cert_index, - const galera::TrxHandleSlave* const trx, - const galera::KeySetIn& key_set, - const long processed) -{ - /* 'strictly less' comparison is essential in the following loop: - * processed key failed cert and was not added to index */ - for (long i(0); i < processed; ++i) - { - KeyEntryNG ke(key_set.next()); - - // Clean up cert index from entries which were added by this trx - galera::Certification::CertIndexNG::iterator ci(cert_index.find(&ke)); - - if (gu_likely(ci != cert_index.end())) - { - galera::KeyEntryNG* kep(*ci); - - if (kep->referenced() == false) - { - // kel was added to cert_index_ by this trx - - // remove from cert_index_ and fall through to delete - cert_index.erase(ci); - } - else continue; - - assert(kep->referenced() == false); - - delete kep; - } - else if(ke.key().wsrep_type(trx->version()) == WSREP_KEY_SHARED) - { - assert(0); // we actually should never be here, the key should - // be either added to cert_index_ or be there already - log_warn << "could not find shared key '" - << ke.key() << "' from cert index"; - } - else { /* non-shared keys can duplicate shared in the key set */ } - } -} - galera::Certification::TestResult -galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys) +galera::Certification::do_test_v3to5(TrxHandleSlave* trx) { cert_debug << "BEGIN CERTIFICATION v" << trx->version() << ": " << *trx; @@ -373,7 +360,7 @@ galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys) { const KeySet::KeyPart& key(key_set.next()); - if (certify_v3to5(cert_index_ng_, key, trx, store_keys, log_conflicts_)) + if (certify_v3to5(cert_index_ng_, key, trx, log_conflicts_)) { trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_)); goto cert_fail; @@ -382,16 +369,14 @@ galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys) trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_)); - if (store_keys == true) - { - assert (key_count == processed); - key_set.rewind(); - do_ref_keys(cert_index_ng_, trx, key_set, key_count); + assert (key_count == processed); + key_set.rewind(); + do_ref_keys(cert_index_ng_, trx, key_set, key_count); - if (trx->pa_unsafe()) last_pa_unsafe_ = trx->global_seqno(); + if (trx->pa_unsafe()) last_pa_unsafe_ = trx->global_seqno(); + + key_count_ += key_count; - key_count_ += key_count; - } cert_debug << "END CERTIFICATION (success): " << *trx; return TEST_OK; @@ -400,14 +385,7 @@ galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys) cert_debug << "END CERTIFICATION (failed): " << *trx; assert (processed < key_count); - - if (store_keys == true) - { - /* Clean up key entries allocated for this trx */ - key_set.rewind(); - do_clean_keys(cert_index_ng_, trx, key_set, processed); - assert(cert_index_ng_.size() == prev_cert_index_size); - } + assert(cert_index_ng_.size() == prev_cert_index_size); return TEST_FAILED; } @@ -431,8 +409,9 @@ trx_cert_version_match(int const trx_version, int const cert_version) } galera::Certification::TestResult -galera::Certification::do_test(const TrxHandleSlavePtr& trx, bool store_keys) +galera::Certification::do_test(const TrxHandleSlavePtr& trx) { + assert(mutex_.owned()); assert(trx->source_id() != WSREP_UUID_UNDEFINED); if (!trx_cert_version_match(trx->version(), version_)) @@ -461,8 +440,6 @@ galera::Certification::do_test(const TrxHandleSlavePtr& trx, bool store_keys) TestResult res(TEST_FAILED); - gu::Lock lock(mutex_); // why do we need that? - e.g. set_trx_committed() - /* initialize parent seqno */ if (gu_unlikely(trx_map_.empty())) { @@ -486,14 +463,14 @@ galera::Certification::do_test(const TrxHandleSlavePtr& trx, bool store_keys) case 3: case 4: case 5: - res = do_test_v3to5(trx.get(), store_keys); + res = do_test_v3to5(trx.get()); break; default: gu_throw_fatal << "certification test for version " << version_ << " not implemented"; } - if (store_keys == true && res == TEST_OK) + if (res == TEST_OK) { ++trx_count_; gu::Lock lock(stats_mutex_); @@ -1124,31 +1101,26 @@ galera::Certification::adjust_position(const View& view, } } -wsrep_seqno_t -galera::Certification::increment_position() -{ - gu::Lock lock(mutex_); - position_++; - return position_; -} - galera::Certification::TestResult -galera::Certification::test(const TrxHandleSlavePtr& trx, bool store_keys) +galera::Certification::test(const TrxHandleSlavePtr& trx) { + assert(mutex_.owned()); assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */); - const TestResult ret - (trx->preordered() ? - do_test_preordered(trx.get()) : do_test(trx, store_keys)); + const TestResult ret(trx->preordered() ? do_test_preordered(trx.get()) + : do_test(trx)); - if (gu_unlikely(ret != TEST_OK)) { trx->mark_dummy(); } + if (gu_unlikely(ret != TEST_OK)) + { + trx->mark_dummy(); + } return ret; } - wsrep_seqno_t galera::Certification::get_safe_to_discard_seqno_() const { + assert(mutex_.owned()); wsrep_seqno_t retval; if (deps_set_.empty() == true) { @@ -1167,10 +1139,14 @@ galera::Certification::purge_trxs_upto_(wsrep_seqno_t const seqno, bool const handle_gcache) { assert (seqno > 0); + assert(mutex_.owned()); TrxMap::iterator purge_bound(trx_map_.upper_bound(seqno)); - cert_debug << "purging index up to " << seqno; + cert_debug << "purging index up to " << seqno << ", safe to discard seqno " << get_safe_to_discard_seqno_(); + + assert(purge_bound == trx_map_.end() || + purge_bound->first <= get_safe_to_discard_seqno_() + 1); for_each(trx_map_.begin(), purge_bound, PurgeAndDiscard(*this)); trx_map_.erase(trx_map_.begin(), purge_bound); @@ -1192,13 +1168,13 @@ galera::Certification::TestResult galera::Certification::append_trx(const TrxHandleSlavePtr& trx) { // explicit ROLLBACK is dummy() assert(!trx->is_dummy()); - assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */); + assert(trx->global_seqno() > 0 /* && trx->local_seqno() >= 0 */); assert(trx->global_seqno() > position_); #ifndef NDEBUG bool const explicit_rollback(trx->explicit_rollback()); #endif /* NDEBUG */ - + TestResult retval = TEST_FAILED; { gu::Lock lock(mutex_); @@ -1237,19 +1213,14 @@ galera::Certification::append_trx(const TrxHandleSlavePtr& trx) } else { - cert_debug << "purging index up to " << trim_seqno; + cert_debug << "append_trx: purging index up to " << trim_seqno; } purge_trxs_upto_(trim_seqno, true); } - } - const TestResult retval(test(trx, true)); + retval = test(trx); - { - assert(trx->global_seqno() > 0); - - gu::Lock lock(mutex_); if (trx_map_.insert( std::make_pair(trx->global_seqno(), trx)).second == false) gu_throw_fatal << "duplicate trx entry " << *trx; @@ -1279,6 +1250,22 @@ galera::Certification::append_trx(const TrxHandleSlavePtr& trx) } +void galera::Certification::append_dummy_preload(const TrxHandleSlavePtr& trx) +{ + assert(trx->global_seqno() >= 0); + assert(trx->global_seqno() > position_); + gu::Lock lock(mutex_); + /* Dummy preloads have only meta data available, not the whole write set, + so modifying or accessing the trx object causes problems later on. + Insert nullptr as a placeholder for seqno. */ + if (not trx_map_.insert(std::make_pair(trx->global_seqno(), nullptr)) + .second) + { + gu_throw_fatal << "duplicate trx entry in dummy preload"; + } + position_ = trx->global_seqno(); +} + wsrep_seqno_t galera::Certification::set_trx_committed(TrxHandleSlave& trx) { assert(trx.global_seqno() >= 0); diff --git a/galera/src/certification.hpp b/galera/src/certification.hpp index 881dee477..ac6ce61e1 100644 --- a/galera/src/certification.hpp +++ b/galera/src/certification.hpp @@ -64,9 +64,9 @@ namespace galera void assign_initial_position(const gu::GTID& gtid, int version); TestResult append_trx(const TrxHandleSlavePtr&); - TestResult test(const TrxHandleSlavePtr&, bool store_keys); + /* Append dummy trx from cert index preload. */ + void append_dummy_preload(const TrxHandleSlavePtr&); wsrep_seqno_t position() const { return position_; } - wsrep_seqno_t increment_position(); /* for dummy IST events */ /* this is for configuration change use */ void adjust_position(const View&, const gu::GTID& gtid, int version); @@ -147,8 +147,9 @@ namespace galera Certification(const Certification&); Certification& operator=(const Certification&); - TestResult do_test(const TrxHandleSlavePtr&, bool store_keys); - TestResult do_test_v3to5(TrxHandleSlave*, bool); + TestResult test(const TrxHandleSlavePtr&); + TestResult do_test(const TrxHandleSlavePtr&); + TestResult do_test_v3to5(TrxHandleSlave*); TestResult do_test_preordered(TrxHandleSlave*); TestResult do_test_nbo(const TrxHandleSlavePtr&); void purge_for_trx(TrxHandleSlave*); @@ -182,6 +183,11 @@ namespace galera void operator()(TrxMap::value_type& vt) const { + if (not vt.second) + { + // Dummy preload events insert only seqno + return; + } { TrxHandleSlave* trx(vt.second.get()); // Trying to lock trx mutex here may cause deadlock diff --git a/galera/src/key_entry_ng.hpp b/galera/src/key_entry_ng.hpp index 218815c89..5190a0317 100644 --- a/galera/src/key_entry_ng.hpp +++ b/galera/src/key_entry_ng.hpp @@ -7,32 +7,31 @@ #include "trx_handle.hpp" +#include + namespace galera { class KeyEntryNG { public: KeyEntryNG(const KeySet::KeyPart& key) - : refs_(), + : refs_{nullptr, nullptr, nullptr, nullptr}, #ifndef NDEBUG - seqnos_(), + seqnos_{0, 0, 0, 0}, #endif // NDEBUG key_(key) { - std::fill(&refs_[0], - &refs_[KeySet::Key::TYPE_MAX], - static_cast(NULL)); -#ifndef NDEBUG - std::fill(&seqnos_[0], &seqnos_[KeySet::Key::TYPE_MAX], 0); -#endif // NDEBUG } KeyEntryNG(const KeyEntryNG& other) - : refs_(), key_(other.key_) + : refs_(other.refs_) + , +#ifndef NDEBUG + seqnos_(other.seqnos_) + , +#endif /* NDEBUG */ + key_(other.key_) { - std::copy(&other.refs_[0], - &other.refs_[KeySet::Key::TYPE_MAX], - &refs_[0]); } const KeySet::KeyPart& key() const { return key_; } @@ -40,6 +39,7 @@ namespace galera void ref(wsrep_key_type_t p, const KeySet::KeyPart& k, TrxHandleSlave* trx) { + assert(p <= KeySet::Key::TYPE_MAX); assert(0 == refs_[p] || refs_[p]->global_seqno() <= trx->global_seqno()); @@ -52,6 +52,7 @@ namespace galera void unref(wsrep_key_type_t p, const TrxHandleSlave* trx) { + assert(p <= KeySet::Key::TYPE_MAX); assert(refs_[p] != NULL); if (refs_[p] == trx) @@ -67,18 +68,25 @@ namespace galera bool referenced() const { - bool ret(refs_[0] != NULL); - - for (int i(1); false == ret && i <= KeySet::Key::TYPE_MAX; ++i) + for (auto i : refs_) { - ret = (refs_[i] != NULL); + if (i != nullptr) return true; } + return false; + } - return ret; + void + for_each_ref(const std::function& fn) const + { + for (auto i : refs_) + { + fn(i); + } } const TrxHandleSlave* ref_trx(wsrep_key_type_t const p) const { + assert(p <= KeySet::Key::TYPE_MAX); return refs_[p]; } @@ -89,9 +97,11 @@ namespace galera void swap(KeyEntryNG& other) throw() { - using std::swap; - gu::swap_array(refs_, other.refs_); - swap(key_, other.key_); + std::swap(refs_, other.refs_); +#ifndef NDEBUG + std::swap(seqnos_, other.seqnos_); +#endif /* NDEBUG */ + std::swap(key_, other.key_); } KeyEntryNG& operator=(KeyEntryNG ke) @@ -106,17 +116,12 @@ namespace galera } private: - - TrxHandleSlave* refs_[KeySet::Key::TYPE_MAX + 1]; + std::array refs_; #ifndef NDEBUG - wsrep_seqno_t seqnos_[KeySet::Key::TYPE_MAX + 1]; + std::array seqnos_; #endif // NDEBUG KeySet::KeyPart key_; -#ifndef NDEBUG - void assert_ref(KeySet::Key::Prefix, TrxHandleSlave*) const; - void assert_unref(KeySet::Key::Prefix, TrxHandleSlave*) const; -#endif /* NDEBUG */ }; inline void swap(KeyEntryNG& a, KeyEntryNG& b) { a.swap(b); } diff --git a/galera/src/replicator_smm.cpp b/galera/src/replicator_smm.cpp index f77cbddd8..996e877ee 100644 --- a/galera/src/replicator_smm.cpp +++ b/galera/src/replicator_smm.cpp @@ -774,35 +774,21 @@ wsrep_status_t galera::ReplicatorSMM::replicate(TrxHandleMaster& trx, if (gu_unlikely(trx.state() == TrxHandle::S_MUST_ABORT)) { - retval = cert_for_aborted(ts); + retval = WSREP_BF_ABORT; - if (retval != WSREP_BF_ABORT) + // If the transaction was committing, it must replay. Otherwise + // it was an intermediate fragment and we treat it as certification + // failure. + if (ts->flags() & TrxHandle::F_COMMIT) { - assert(trx.state() == TrxHandle::S_MUST_ABORT); - TX_SET_STATE(trx, TrxHandle::S_ABORTING); - - pending_cert_queue_.push(ts); - cancel_monitors_for_local(*ts); - - assert(ts->is_dummy()); - assert(WSREP_OK != retval); + TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY); } else { - // If the transaction was committing, it must replay. - if (ts->flags() & TrxHandle::F_COMMIT) - { - TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY); - } - else - { - TX_SET_STATE(trx, TrxHandle::S_ABORTING); - - pending_cert_queue_.push(ts); - cancel_monitors_for_local(*ts); - - retval = WSREP_TRX_FAIL; - } + TX_SET_STATE(trx, TrxHandle::S_ABORTING); + pending_cert_queue_.push(ts); + cancel_monitors_for_local(*ts); + retval = WSREP_TRX_FAIL; } } else @@ -996,7 +982,6 @@ wsrep_status_t galera::ReplicatorSMM::certify(TrxHandleMaster& trx, switch(retval) { case WSREP_BF_ABORT: - assert(ts->depends_seqno() >= 0); assert(trx.state() == TrxHandle::S_MUST_REPLAY || !(ts->flags() & TrxHandle::F_COMMIT)); assert(ts->state() == TrxHandle::S_REPLICATING || @@ -1529,7 +1514,8 @@ wsrep_status_t galera::ReplicatorSMM::release_rollback(TrxHandleMaster& trx) TX_SET_STATE(ts, TrxHandle::S_COMMITTING); } commit_monitor_.leave(co); - assert(commit_monitor_.last_left() >= ts.global_seqno()); + assert(co_mode_ != CommitOrder::NO_OOOC || + commit_monitor_.last_left() >= ts.global_seqno()); TX_SET_STATE(ts, TrxHandle::S_COMMITTED); } @@ -2165,10 +2151,13 @@ void galera::ReplicatorSMM::process_commit_cut(wsrep_seqno_t const seq, LocalOrder lo(seqno_l); gu_trace(local_monitor_.enter(lo)); - + process_pending_queue(seqno_l); if (seq >= cc_seqno_) /* Refs #782. workaround for * assert(seqno >= seqno_released_) in gcache. */ + { + assert(seq <= last_committed()); cert_.purge_trxs_upto(seq, true); + } local_monitor_.leave(lo); log_debug << "Got commit cut from GCS: " << seq; @@ -3179,8 +3168,8 @@ void galera::ReplicatorSMM::process_pending_queue(wsrep_seqno_t local_seqno) assert(local_seqno > 0); // pending_cert_queue_ contains all writesets that: // a) were BF aborted before being certified - // b) are not going to be replayed even though - // cert_for_aborted() returned TEST_OK for them + // b) are not going to be replayed because of not having + // commit flag set // // Before certifying the current seqno, check if // pending_cert_queue contains any smaller seqno. @@ -3227,10 +3216,7 @@ bool galera::ReplicatorSMM::enter_local_monitor_for_cert( trx->unlock(); } LocalOrder lo(*ts); - if (in_replay == false || local_monitor_.entered(lo) == false) - { - gu_trace(local_monitor_.enter(lo)); - } + gu_trace(local_monitor_.enter(lo)); if (trx != 0) trx->lock(); @@ -3257,35 +3243,26 @@ wsrep_status_t galera::ReplicatorSMM::handle_local_monitor_interrupted( assert(trx != 0); // Did not enter local monitor. assert(ts->state() == TrxHandle::S_REPLICATING); - wsrep_status_t retval(cert_for_aborted(ts)); + wsrep_status_t retval = WSREP_BF_ABORT; - if (WSREP_TRX_FAIL != retval) - { - assert(ts->state() == TrxHandle::S_REPLICATING || - ts->state() == TrxHandle::S_CERTIFYING); - assert(WSREP_BF_ABORT == retval); - assert(trx != 0); + assert(ts->state() == TrxHandle::S_REPLICATING || + ts->state() == TrxHandle::S_CERTIFYING); + assert(trx != 0); - // If the transaction was committing, it must replay. - if (ts->flags() & TrxHandle::F_COMMIT) - { - // Return immediately without canceling local monitor, - // it needs to be grabbed again in replay stage. - TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY); - return retval; - } - // if not - we need to rollback, so pretend that certification - // failed, but still update cert index to match slaves - else - { - pending_cert_queue_.push(ts); - retval = WSREP_TRX_FAIL; - } + // If the transaction was committing, it must replay. + if (ts->flags() & TrxHandle::F_COMMIT) + { + // Return immediately without canceling local monitor, + // it needs to be grabbed again in replay stage. + TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY); + return retval; } + // if not - we need to rollback, so pretend that certification + // failed, but still update cert index to match slaves else { - assert(ts->is_dummy()); pending_cert_queue_.push(ts); + retval = WSREP_TRX_FAIL; } assert(WSREP_TRX_FAIL == retval); @@ -3355,10 +3332,6 @@ wsrep_status_t galera::ReplicatorSMM::finish_cert( break; } - // at this point we are about to leave local_monitor_. Make sure - // trx checksum was alright before that. - ts->verify_checksum(); - // we must do seqno assignment 'in order' for std::map reasons, // so keeping it inside the monitor. NBO end should never be skipped. bool const skip(ts->is_dummy() && !ts->nbo_end()); @@ -3386,6 +3359,9 @@ wsrep_status_t galera::ReplicatorSMM::cert(TrxHandleMaster* trx, assert(ts->last_seen_seqno() >= 0); assert(ts->last_seen_seqno() < ts->global_seqno()); + // Verify checksum before certification to avoid corrupting index. + ts->verify_checksum(); + LocalOrder lo(*ts); // Local monitor is either released or canceled in // handle_local_monitor_interrupted(), finish_cert(). @@ -3422,39 +3398,6 @@ wsrep_status_t galera::ReplicatorSMM::cert_and_catch( abort(); } -/* This must be called BEFORE local_monitor_.self_cancel() due to - * gcache_.seqno_assign() */ -wsrep_status_t galera::ReplicatorSMM::cert_for_aborted( - const TrxHandleSlavePtr& ts) -{ - // trx was BF aborted either while it was replicating or - // while it was waiting for local monitor - assert(ts->state() == TrxHandle::S_REPLICATING || - ts->state() == TrxHandle::S_CERTIFYING); - - Certification::TestResult const res(cert_.test(ts, false)); - - switch (res) - { - case Certification::TEST_OK: - return WSREP_BF_ABORT; - - case Certification::TEST_FAILED: - // Next step will be monitors release. Make sure that ws was not - // corrupted and cert failure is real before proceeding with that. - //gcf788 - this must be moved to cert(), the caller method - assert(ts->is_dummy()); - ts->verify_checksum(); - assert(!ts->nbo_end()); // should never be skipped in seqno_assign() - return WSREP_TRX_FAIL; - - default: - log_fatal << "Unexpected return value from Certification::test(): " - << res; - abort(); - } -} - bool galera::ReplicatorSMM::enter_apply_monitor_for_local( TrxHandleMaster& trx, const TrxHandleSlavePtr& ts) diff --git a/galera/src/replicator_smm.hpp b/galera/src/replicator_smm.hpp index eb845682c..b9d5caede 100644 --- a/galera/src/replicator_smm.hpp +++ b/galera/src/replicator_smm.hpp @@ -434,6 +434,7 @@ namespace galera { if (gu_unlikely(purge_seqno != -1)) { + assert(purge_seqno <= last_committed()); service_thd_.report_last_committed(purge_seqno); } } @@ -488,8 +489,6 @@ namespace galera const TrxHandleSlavePtr&); wsrep_status_t cert_and_catch (TrxHandleMaster*, const TrxHandleSlavePtr&); - wsrep_status_t cert_for_aborted (const TrxHandleSlavePtr&); - // Enter apply monitor for local transaction. Return true // if apply monitor was grabbed. bool enter_apply_monitor_for_local(TrxHandleMaster&, diff --git a/galera/src/replicator_str.cpp b/galera/src/replicator_str.cpp index b1da88803..553d32f31 100644 --- a/galera/src/replicator_str.cpp +++ b/galera/src/replicator_str.cpp @@ -1286,9 +1286,8 @@ void ReplicatorSMM::handle_ist_nbo(const TrxHandleSlavePtr& ts, // donor refuses to donate SST from the position with active NBO. assert(preload); log_debug << "Skipping NBO event: " << ts; - wsrep_seqno_t const pos(cert_.increment_position()); - assert(ts->global_seqno() == pos); - (void)pos; + cert_.append_dummy_preload(ts); + assert(ts->global_seqno() == cert_.position()); } if (gu_likely(must_apply == true)) { @@ -1329,11 +1328,10 @@ void ReplicatorSMM::handle_ist_trx_preload(const TrxHandleSlavePtr& ts, } else if (cert_.position() != WSREP_SEQNO_UNDEFINED) { - // Increment position to keep track only if the initial - // seqno has already been assigned. - wsrep_seqno_t const pos __attribute__((unused))( - cert_.increment_position()); - assert(ts->global_seqno() == pos); + // Append dummy trx to keep certification trx map continuous which + // is a requirement for cert purge to work properly. + cert_.append_dummy_preload(ts); + assert(ts->global_seqno() == cert_.position()); } } diff --git a/galera/src/wsrep_provider.cpp b/galera/src/wsrep_provider.cpp index ba18a5592..7b81286a5 100644 --- a/galera/src/wsrep_provider.cpp +++ b/galera/src/wsrep_provider.cpp @@ -1815,12 +1815,16 @@ extern "C" int wsrep_init_config_service_v1(wsrep_config_service_v1_t *config_service) { config_service->get_parameters = get_parameters; + // Deprecation checks will be done by application which uses + // the service. + gu::Config::disable_deprecation_check(); return WSREP_OK; } extern "C" void wsrep_deinit_config_service_v1() { + gu::Config::enable_deprecation_check(); } extern "C" diff --git a/galera/tests/defaults_check.cpp b/galera/tests/defaults_check.cpp index 5859dbd11..e11c396ff 100644 --- a/galera/tests/defaults_check.cpp +++ b/galera/tests/defaults_check.cpp @@ -100,6 +100,7 @@ static const char* Defaults[] = "pc.wait_prim", "true", "pc.wait_prim_timeout", "PT30S", "pc.weight", "1", + "protonet.backend", "asio", "protonet.version", "0", "repl.causal_read_timeout", "PT30S", "repl.commit_order", "3", diff --git a/galerautils/src/SConscript b/galerautils/src/SConscript index f4ae40f7e..43b3ef7f0 100644 --- a/galerautils/src/SConscript +++ b/galerautils/src/SConscript @@ -67,6 +67,7 @@ libgalerautilsxx_env.Append(CXXFLAGS = ' -Wno-old-style-cast') libgalerautilsxx_sources = [ 'gu_vlq.cpp', 'gu_datetime.cpp', + 'gu_event_service.cpp', 'gu_exception.cpp', 'gu_serialize.cpp', 'gu_logger.cpp', diff --git a/galerautils/src/gu_asio_stream_react.cpp b/galerautils/src/gu_asio_stream_react.cpp index eb97bce5b..c6d66a8c4 100644 --- a/galerautils/src/gu_asio_stream_react.cpp +++ b/galerautils/src/gu_asio_stream_react.cpp @@ -42,46 +42,60 @@ gu::AsioStreamReact::AsioStreamReact( gu::AsioStreamReact::~AsioStreamReact() { shutdown(); + close(); } -void gu::AsioStreamReact::open(const gu::URI& uri) +void gu::AsioStreamReact::open(const gu::URI& uri) try { - try - { - auto resolve_result(resolve_tcp(io_service_.impl().native(), uri)); - socket_.open(resolve_result->endpoint().protocol()); - set_fd_options(socket_); - } - catch (const asio::system_error& e) - { - gu_throw_error(e.code().value()) - << "error opening stream socket " << uri; - } + auto resolve_result(resolve_tcp(io_service_.impl().native(), uri)); + socket_.open(resolve_result->endpoint().protocol()); + set_fd_options(socket_); +} +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) + << "error opening stream socket " << uri; } -bool gu::AsioStreamReact::is_open() const +bool gu::AsioStreamReact::is_open() const try { return socket_.is_open(); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) + << "error checking if socket is open "; + return false; +} + void gu::AsioStreamReact::close() try { GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::close"); + if (not is_open()) + { + GU_ASIO_DEBUG(debug_print() << "Socket not open on close"); + } socket_.close(); } -catch (const asio::system_error& e) +// Catch all the possible exceptions here, not only asio ones. +catch (const std::exception& e) { - gu_throw_error(e.code().value()) << "Failed to close socket: " << e.what(); + log_warn << "Closing socket failed: " << e.what(); } -void gu::AsioStreamReact::bind(const gu::AsioIpAddress& addr) +void gu::AsioStreamReact::bind(const gu::AsioIpAddress& addr) try { ::bind(socket_, addr); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error in binding"; +} void gu::AsioStreamReact::async_connect( const gu::URI& uri, - const std::shared_ptr& handler) + const std::shared_ptr& handler) try { GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::connect: " << uri); auto resolve_result(resolve_tcp(io_service_.impl().native(), uri)); @@ -96,6 +110,11 @@ void gu::AsioStreamReact::async_connect( handler, asio::placeholders::error)); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error connecting "; +} + void gu::AsioStreamReact::async_write( const std::array& bufs, @@ -250,36 +269,57 @@ std::string gu::AsioStreamReact::remote_addr() const return remote_addr_; } -void gu::AsioStreamReact::set_receive_buffer_size(size_t size) +void gu::AsioStreamReact::set_receive_buffer_size(size_t size) try { assert(not connected_); ::set_receive_buffer_size(socket_, size); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error setting receive buffer size"; +} -size_t gu::AsioStreamReact::get_receive_buffer_size() +size_t gu::AsioStreamReact::get_receive_buffer_size() try { return ::get_receive_buffer_size(socket_); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error getting receive buffer size "; +} -void gu::AsioStreamReact::set_send_buffer_size(size_t size) +void gu::AsioStreamReact::set_send_buffer_size(size_t size) try { assert(not connected_); ::set_send_buffer_size(socket_, size); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error setting send buffer size"; +} -size_t gu::AsioStreamReact::get_send_buffer_size() +size_t gu::AsioStreamReact::get_send_buffer_size() try { return ::get_send_buffer_size(socket_); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error getting send buffer size"; +} -struct tcp_info gu::AsioStreamReact::get_tcp_info() +struct tcp_info gu::AsioStreamReact::get_tcp_info() try { return ::get_tcp_info(socket_); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error getting TCP info"; +} + void gu::AsioStreamReact::complete_client_handshake( const std::shared_ptr& handler, - AsioStreamEngine::op_status result) + AsioStreamEngine::op_status result) try { switch (result) { @@ -306,30 +346,26 @@ void gu::AsioStreamReact::complete_client_handshake( assert(0); } } +catch (const asio::system_error& e) +{ + handler->connect_handler(*this, AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::connect_handler( const std::shared_ptr& handler, - const asio::error_code& ec) + const asio::error_code& ec) try { GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::connect_handler: " << ec); if (ec) { handler->connect_handler(*this, AsioErrorCode(ec.value(), ec.category())); - socket_.close(); + close(); return; } set_socket_options(socket_); prepare_engine(true); - try - { - assign_addresses(); - } - catch(const asio::system_error& e) - { - handler->connect_handler(*this, AsioErrorCode(e.code().value())); - return; - } + assign_addresses(); GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::connect_handler: init handshake"); auto result(engine_->client_handshake()); @@ -341,16 +377,20 @@ void gu::AsioStreamReact::connect_handler( if (ec) { handler->connect_handler(*this, AsioErrorCode(ec.value(), ec.category())); - socket_.close(); + close(); return; } complete_client_handshake(handler, result); }); } +catch (const asio::system_error& e) +{ + handler->connect_handler(*this, AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::client_handshake_handler( const std::shared_ptr& handler, - const asio::error_code& ec) + const asio::error_code& ec) try { // During handshake there is only read or write in progress // at the time. Therefore safe to clear both flags. @@ -360,7 +400,7 @@ void gu::AsioStreamReact::client_handshake_handler( { handler->connect_handler( *this, AsioErrorCode(ec.value(), ec.category())); - socket_.close(); + close(); return; } auto result(engine_->client_handshake()); @@ -393,11 +433,15 @@ void gu::AsioStreamReact::client_handshake_handler( break; } } +catch (const asio::system_error& e) +{ + handler->connect_handler(*this, AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::complete_server_handshake( const std::shared_ptr& acceptor, AsioStreamEngine::op_status result, - const std::shared_ptr& acceptor_handler) + const std::shared_ptr& acceptor_handler) try { GU_ASIO_DEBUG(debug_print() << "AsioStreamReact::server_handshake_handler: " << "result from engine: " << result); @@ -434,11 +478,16 @@ void gu::AsioStreamReact::complete_server_handshake( break; } } +catch (const asio::system_error& e) +{ + acceptor_handler->accept_handler(*acceptor, shared_from_this(), + AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::server_handshake_handler( const std::shared_ptr& acceptor, const std::shared_ptr& acceptor_handler, - const asio::error_code& ec) + const asio::error_code& ec) try { // During handshake there is only read or write in progress // at the time. Therefore safe to clear both flags. @@ -460,10 +509,15 @@ void gu::AsioStreamReact::server_handshake_handler( self->complete_server_handshake(acceptor, result, acceptor_handler); }); } +catch (const asio::system_error& e) +{ + acceptor_handler->accept_handler(*acceptor, shared_from_this(), + AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::read_handler( const std::shared_ptr& handler, - const asio::error_code& ec) + const asio::error_code& ec) try { GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::read_handler: " << ec); @@ -523,10 +577,14 @@ void gu::AsioStreamReact::read_handler( break; } } +catch (const asio::system_error& e) +{ + handle_read_handler_error(handler, AsioErrorCode(e.code().value())); +} void gu::AsioStreamReact::write_handler( const std::shared_ptr& handler, - const asio::error_code& ec) + const asio::error_code& ec) try { GU_ASIO_DEBUG(debug_print() << " AsioStreamReact::write_handler: " << ec); in_progress_ &= ~write_in_progress; @@ -573,6 +631,10 @@ void gu::AsioStreamReact::write_handler( break; } } +catch (const asio::system_error& e) +{ + handle_write_handler_error(handler, AsioErrorCode(e.code().value())); +} // @@ -697,7 +759,7 @@ void gu::AsioStreamReact::handle_read_handler_error( *this, ec, read_context_.bytes_transferred()); - socket_.close(); + close(); } void gu::AsioStreamReact::handle_write_handler_error( @@ -709,7 +771,7 @@ void gu::AsioStreamReact::handle_write_handler_error( *this, ec, write_context_.bytes_transferred()); - socket_.close(); + close(); } void gu::AsioStreamReact::set_non_blocking(bool val) @@ -888,32 +950,51 @@ catch (const asio::system_error& e) << "', asio error '" << e.what() << "'"; } -void gu::AsioAcceptorReact::set_receive_buffer_size(size_t size) +void gu::AsioAcceptorReact::set_receive_buffer_size(size_t size) try { assert(not listening_); ::set_receive_buffer_size(acceptor_, size); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error setting receive buffer size"; +} -size_t gu::AsioAcceptorReact::get_receive_buffer_size() + +size_t gu::AsioAcceptorReact::get_receive_buffer_size() try { return ::get_receive_buffer_size(acceptor_); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error getting receive buffer size"; + return 0; +} -void gu::AsioAcceptorReact::set_send_buffer_size(size_t size) +void gu::AsioAcceptorReact::set_send_buffer_size(size_t size) try { assert(not listening_); ::set_send_buffer_size(acceptor_, size); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error setting send buffer size"; +} -size_t gu::AsioAcceptorReact::get_send_buffer_size() +size_t gu::AsioAcceptorReact::get_send_buffer_size() try { return ::get_send_buffer_size(acceptor_); } +catch (const asio::system_error& e) +{ + gu_throw_error(e.code().value()) << "error getting send buffer size"; + return 0; +} void gu::AsioAcceptorReact::accept_handler( const std::shared_ptr& socket, const std::shared_ptr& handler, - const asio::error_code& ec) + const asio::error_code& ec) try { GU_ASIO_DEBUG(this << " AsioAcceptorReact::accept_handler(): " << ec); if (ec) @@ -926,16 +1007,8 @@ void gu::AsioAcceptorReact::accept_handler( set_socket_options(socket->socket_); socket->set_non_blocking(true); socket->prepare_engine(true); - try - { - socket->assign_addresses(); - } - catch(const asio::system_error& e) - { - log_warn << "Failed to accept: " << e.what(); - async_accept(handler); - return; - } + socket->assign_addresses(); + std::string remote_ip = gu::unescape_addr(::escape_addr(socket->socket_.remote_endpoint().address())); auto connection_allowed(gu::allowlist_value_check(WSREP_ALLOWLIST_KEY_IP, remote_ip)); if (connection_allowed == false) @@ -950,3 +1023,9 @@ void gu::AsioAcceptorReact::accept_handler( // server_handshake_handler(). socket->server_handshake_handler(shared_from_this(), handler, ec); } +catch(const asio::system_error& e) +{ + log_warn << "Failed to accept new connection: '" << e.what() << "'"; + async_accept(handler); + return; +} diff --git a/galerautils/src/gu_config.cpp b/galerautils/src/gu_config.cpp index 02b1a1022..e89a0126d 100644 --- a/galerautils/src/gu_config.cpp +++ b/galerautils/src/gu_config.cpp @@ -102,6 +102,9 @@ gu::Config::parse (const std::string& param_list) gu::Config::Config() : params_() {} +std::function + gu::Config::deprecation_check_func_ = check_deprecated; + void gu::Config::set_longlong (const std::string& key, long long val) { @@ -156,6 +159,26 @@ gu::Config::check_conversion (const char* str, } } +void gu::Config::enable_deprecation_check() +{ + deprecation_check_func_ = check_deprecated; +} + +void gu::Config::disable_deprecation_check() +{ + deprecation_check_func_ = nullptr; +} + +void gu::Config::check_deprecated(const std::string& key, + const Parameter& param) +{ + if (param.is_deprecated()) + { + log_warn << "Parameter '" << key + << "' is deprecated and will be removed in future versions"; + } +} + char gu::Config::overflow_char(long long ret) { diff --git a/galerautils/src/gu_config.hpp b/galerautils/src/gu_config.hpp index 7dc5ffac2..f6e95b1c5 100644 --- a/galerautils/src/gu_config.hpp +++ b/galerautils/src/gu_config.hpp @@ -18,6 +18,7 @@ #include #include +#include namespace gu { @@ -95,6 +96,10 @@ class gu::Config if (i != params_.end()) { + if (deprecation_check_func_) + { + deprecation_check_func_(i->first, i->second); + } i->second.set(value); } else @@ -267,6 +272,11 @@ class gu::Config return flags_ & Flag::hidden; } + bool is_deprecated() const + { + return flags_ & Flag::deprecated; + } + void set(const std::string& value) { value_ = value; @@ -291,8 +301,10 @@ class gu::Config const_iterator begin() const { return params_.begin(); } const_iterator end() const { return params_.end(); } -private: + static void enable_deprecation_check(); + static void disable_deprecation_check(); +private: static void key_check (const std::string& key); @@ -300,6 +312,8 @@ class gu::Config check_conversion (const char* ptr, const char* endptr, const char* type, bool range_error = false); + static void check_deprecated(const std::string& str, const Parameter& param); + static char overflow_char(long long ret); @@ -312,8 +326,10 @@ class gu::Config void set_longlong (const std::string& key, long long value); param_map_t params_; -}; + static std::function + deprecation_check_func_; +}; extern "C" const char* gu_str2dbl (const char* str, double* dbl); extern "C" const char* gu_str2bool (const char* str, bool* bl); diff --git a/galerautils/src/gu_utils.hpp b/galerautils/src/gu_utils.hpp index 9a2cee993..92bba70e5 100644 --- a/galerautils/src/gu_utils.hpp +++ b/galerautils/src/gu_utils.hpp @@ -165,14 +165,6 @@ class DeleteObject template void operator()(T* t) { delete t; } }; -/*! swap method for arrays, which does't seem to be built in all compilers */ -template -inline void -swap_array(T (&a)[N], T (&b)[N]) -{ - for (size_t n(0); n < N; ++n) std::swap(a[n], b[n]); -} - typedef std::ios_base& (*base_t) (std::ios_base& str); template remote_addr(); } @@ -230,8 +230,8 @@ class gcomm::AsioTcpSocket::DeferredCloseTimer virtual void handle_wait(const gu::AsioErrorCode& ec) GALERA_OVERRIDE { - log_info << "Deferred close timer handle_wait " - << ec << " for " << socket_->socket_; + log_debug << "Deferred close timer handle_wait " + << ec << " for " << socket_->socket_; socket_->close(); socket_.reset(); } @@ -290,7 +290,7 @@ void gcomm::AsioTcpSocket::write_handler(gu::AsioSocket& socket, { log_debug << "write handler for " << id() << " state " << state(); - if (not gu::is_verbose_error(ec)) + if (ec && not gu::is_verbose_error(ec)) { log_warn << "write_handler(): " << ec.message() << " (" << gu::extra_error_info(ec) << ")"; @@ -598,7 +598,10 @@ size_t gcomm::AsioTcpSocket::read_completion_condition( } catch (const gu::Exception& e) { - log_warn << "unserialize error " << e.what(); + log_warn << "Failed to unserialize message. This may be a " + << "result of corrupt message, port scanner or " + << "another application connecting to " + << "group communication port."; FAILED_HANDLER(gu::AsioErrorCode(e.get_errno())); return 0; } diff --git a/gcomm/src/conf.cpp b/gcomm/src/conf.cpp index 4ecb7d8c7..4e2b00a30 100644 --- a/gcomm/src/conf.cpp +++ b/gcomm/src/conf.cpp @@ -14,6 +14,7 @@ std::string const BaseHost(COMMON_BASE_HOST_KEY); std::string const BasePort(COMMON_BASE_PORT_KEY); // Protonet +std::string const gcomm::Conf::ProtonetBackend("protonet.backend"); std::string const gcomm::Conf::ProtonetVersion("protonet.version"); // TCP @@ -127,6 +128,7 @@ gcomm::Conf::register_params(gu::Config& cnf) GCOMM_CONF_ADD (BaseHost); GCOMM_CONF_ADD (BasePort); + GCOMM_CONF_ADD_DEFAULT(ProtonetBackend); GCOMM_CONF_ADD_DEFAULT(ProtonetVersion); GCOMM_CONF_ADD (TcpNonBlocking); diff --git a/gcomm/src/defaults.cpp b/gcomm/src/defaults.cpp index 35dbd1b3b..0b4a08716 100644 --- a/gcomm/src/defaults.cpp +++ b/gcomm/src/defaults.cpp @@ -9,6 +9,7 @@ namespace gcomm { + std::string const Defaults::ProtonetBackend = "asio"; std::string const Defaults::ProtonetVersion = "0"; std::string const Defaults::SocketChecksum = "2"; std::string const Defaults::SocketRecvBufSize = diff --git a/gcomm/src/defaults.hpp b/gcomm/src/defaults.hpp index c136bbcc2..6492fb4cd 100644 --- a/gcomm/src/defaults.hpp +++ b/gcomm/src/defaults.hpp @@ -12,6 +12,7 @@ namespace gcomm { struct Defaults { + static std::string const ProtonetBackend ; static std::string const ProtonetVersion ; static std::string const SocketChecksum ; static std::string const SocketRecvBufSize ; @@ -60,8 +61,10 @@ namespace gcomm static const int BasePort = gu::Config::Flag::read_only | gu::Config::Flag::type_integer; - static const int ProtonetBackend = gu::Config::Flag::read_only; - static const int ProtonetVersion = gu::Config::Flag::read_only; + static const int ProtonetBackend + = gu::Config::Flag::read_only | gu::Config::Flag::deprecated; + static const int ProtonetVersion + = gu::Config::Flag::read_only | gu::Config::Flag::deprecated; // Hidden because not documented / does not seem to be used? static const int TcpNonBlocking = gu::Config::Flag::hidden; diff --git a/gcomm/src/gcomm/conf.hpp b/gcomm/src/gcomm/conf.hpp index d304bab4f..0c00073e3 100644 --- a/gcomm/src/gcomm/conf.hpp +++ b/gcomm/src/gcomm/conf.hpp @@ -44,6 +44,7 @@ namespace gcomm */ struct Conf { + static std::string const ProtonetBackend; static std::string const ProtonetVersion; /*! diff --git a/gcomm/src/gmcast_proto.hpp b/gcomm/src/gmcast_proto.hpp index 121ec0a76..aedf60deb 100644 --- a/gcomm/src/gmcast_proto.hpp +++ b/gcomm/src/gmcast_proto.hpp @@ -105,7 +105,11 @@ class gcomm::gmcast::Proto gmcast_ (gmcast) { } - ~Proto() { tp_->close(); } + ~Proto() + { + tp_->close(); + tp_ = nullptr; + } void send_msg(const Message& msg, bool ignore_no_buffer_space); void send_handshake(); diff --git a/gcomm/src/protonet.cpp b/gcomm/src/protonet.cpp index b9e5d6403..b560d080b 100644 --- a/gcomm/src/protonet.cpp +++ b/gcomm/src/protonet.cpp @@ -61,11 +61,21 @@ bool gcomm::Protonet::set_param(const std::string& key, const std::string& val, gcomm::Protonet* gcomm::Protonet::create(gu::Config& conf) { + const std::string backend(conf.get(Conf::ProtonetBackend)); const int version(conf.get(Conf::ProtonetVersion)); if (version > max_version_) { gu_throw_error(EINVAL) << "invalid protonet version: " << version; } - return new AsioProtonet(conf, version); + + log_info << "protonet " << backend << " version " << version; + + if (backend == "asio") + return new AsioProtonet(conf, version); + + gu_throw_fatal << Conf::ProtonetBackend << " '" << backend + << "' not supported"; throw; + + return 0; // keep compiler happy } diff --git a/gcs/src/gcs.cpp b/gcs/src/gcs.cpp index b5dfd8dd9..b117b1781 100644 --- a/gcs/src/gcs.cpp +++ b/gcs/src/gcs.cpp @@ -828,6 +828,11 @@ start_progress(gcs_conn_t* conn) { gu_fifo_lock(conn->recv_q); { + if (conn->progress_) + { + // Did not reach synced after previously becoming joined. + delete conn->progress_; + } conn->progress_ = new gu::Progress( conn->progress_cb_, "Processing event queue:", " events", @@ -1709,6 +1714,11 @@ long gcs_close (gcs_conn_t *conn) } /* recv_thread() is supposed to set state to CLOSED when exiting */ assert (GCS_CONN_CLOSED == conn->state); + if (conn->progress_) + { + delete conn->progress_; + conn->progress_ = nullptr; + } return ret; } diff --git a/gcs/src/unit_tests/gcs_core_test.cpp b/gcs/src/unit_tests/gcs_core_test.cpp index fbc6a8961..75ee54393 100644 --- a/gcs/src/unit_tests/gcs_core_test.cpp +++ b/gcs/src/unit_tests/gcs_core_test.cpp @@ -591,14 +591,18 @@ END_TEST // do a single send step, compare with the expected result static inline bool -CORE_SEND_STEP (gcs_core_t* core, long timeout, long ret) +CORE_SEND_STEP (gcs_core_t* core, long timeout, long ret, int line) { long err = gcs_core_send_step (core, timeout); ck_assert_msg(err >= 0, "gcs_core_send_step(): %ld (%s)", err, strerror (-err)); if (ret >= 0) { - ck_assert_msg(err == ret, "gcs_core_send_step(): expected %ld, got %ld", - ret, err); + if (err != ret) { + fprintf(stderr, "gcs_core_send_step(%ld, %ld) at line %d:" + " expected %ld, got %ld", timeout, ret, line, ret, err); + assert(0); // to catch a core if possible + ck_abort(); + } } return false; @@ -667,11 +671,11 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) ck_assert(!CORE_RECV_START (&act_r)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag usleep (10000); // resolve race between sending and setting transitional gcs_dummy_set_transitional (Backend); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag - ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // no frags left + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 0, __LINE__)); // no frags left ck_assert(NULL == act_r.out); // should not have received anything ck_assert(!gcs_dummy_set_component (Backend, prim)); // return to PRIM state ck_assert(!CORE_SEND_END (&act_s, act_size)); @@ -685,8 +689,8 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) */ ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag ck_assert(!CORE_SEND_END (&act_s, act_size)); ck_assert(!gcs_dummy_set_component(Backend, non_prim)); ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE)); @@ -703,8 +707,8 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) * fragment send fails. */ ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag - ck_assert(!CORE_SEND_STEP (Core, tout, 0)); // bail out after 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 0, __LINE__)); // bail out after 1st frag ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN)); /* @@ -717,7 +721,7 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) ck_assert(!gcs_dummy_set_component(Backend, non_prim)); ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN)); ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE)); ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1)); @@ -730,9 +734,9 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) */ ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag ck_assert(!CORE_SEND_END (&act_s, act_size)); ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE)); ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1)); @@ -749,11 +753,11 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) */ ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag usleep (100000); // make sure 1st fragment gets in before new component ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, non_prim)); ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag ck_assert(!CORE_SEND_END (&act_s, act_size)); ck_assert(!CORE_RECV_ACT (&act_r, act_buf, act_size, GCS_ACT_WRITESET)); ck_assert_msg(-ERESTART == act_r.seqno, @@ -778,25 +782,25 @@ CORE_TEST_OWN(bool const enc, int gcs_proto_ver) // subcase 1 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag usleep (500000); // fail_if_seq ck_assert(!gcs_dummy_set_component(Backend, non_prim)); ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE)); ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 3rd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 3rd frag ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN)); // subcase 2 ck_assert(!DUMMY_INSTALL_COMPONENT (Backend, prim)); ck_assert(!CORE_SEND_START (&act_s)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 1st frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 1st frag ck_assert(!DUMMY_INJECT_COMPONENT (Backend, non_prim)); - ck_assert(!CORE_SEND_STEP (Core, tout, 1)); // 2nd frag + ck_assert(!CORE_SEND_STEP (Core, tout, 1, __LINE__)); // 2nd frag usleep (1000000); ck_assert(!gcs_dummy_set_component(Backend, non_prim)); - ck_assert(!CORE_SEND_STEP (Core, 4*tout, 1)); // 3rd frag + ck_assert(!CORE_SEND_STEP (Core, 4*tout, 1, __LINE__)); // 3rd frag ck_assert(!CORE_RECV_ACT (&act_r, NULL, UNKNOWN_SIZE, GCS_ACT_CCHANGE)); ck_assert(!core_test_check_conf(act_r.out, act_r.size, false, 0, 1)); ck_assert(!CORE_SEND_END (&act_s, -ENOTCONN)); diff --git a/gcs/src/unit_tests/gcs_memb_test.cpp b/gcs/src/unit_tests/gcs_memb_test.cpp index fb34e264b..218a11270 100644 --- a/gcs/src/unit_tests/gcs_memb_test.cpp +++ b/gcs/src/unit_tests/gcs_memb_test.cpp @@ -248,9 +248,10 @@ Suite *gcs_memb_suite(void) Suite *suite = suite_create("GCS membership changes"); TCase *tcase = tcase_create("gcs_memb"); - suite_add_tcase (suite, tcase); - tcase_add_test (tcase, gcs_memb_test_465); + suite_add_tcase (suite, tcase); + tcase_add_test (tcase, gcs_memb_test_465); tcase_add_test (tcase, gcs_memb_test_465E); + tcase_set_timeout(tcase, 30); tcase = tcase_create("membership_service"); suite_add_tcase (suite, tcase); diff --git a/scripts/build.sh b/scripts/build.sh index b2977fc2c..8b3882fec 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -5,7 +5,7 @@ set -eux # $Id$ # Galera library version -VERSION="26.4.14" +VERSION="26.4.16" get_cores() { diff --git a/scripts/mysql/build.sh b/scripts/mysql/build.sh index 2042a9b72..d8aca23dd 100755 --- a/scripts/mysql/build.sh +++ b/scripts/mysql/build.sh @@ -451,7 +451,6 @@ then BUILD_OPT+=" -DWITH_EXTRA_CHARSETS=all" BUILD_OPT+=" -DMYSQL_MAINTAINER_MODE=0" - BUILD_OPT+=" -DWITH_ZLIB=system" MYSQL_MM_VER="$MYSQL_MAJOR_VER$MYSQL_MINOR_VER" @@ -484,6 +483,7 @@ then BUILD_OPT+=" -DWITH_KEYRING_VAULT_TEST=ON" else # MariaDB-specific build options + BUILD_OPT+=" -DWITH_SSL=bundled" BUILD_OPT+=" -DWITH_READLINE=system" BUILD_OPT+=" -DWITH_DEBUG:BOOL=ON" BUILD_OPT+=" -DWITH_INNODB_DISALLOW_WRITES:BOOL=ON" diff --git a/tests/scripts/action.sh b/tests/scripts/action.sh index fa43e8a46..39ae0dd81 100644 --- a/tests/scripts/action.sh +++ b/tests/scripts/action.sh @@ -1,11 +1,23 @@ # Helper to get status variable value +mysql_command() +{ + local node=$1 + if [ "${NODE_LOCATION[$node]}" = "local" ] + then + echo "${NODE_TEST_DIR[$node]}/mysql/bin/mysql" + else + echo "mysql" + fi +} + cluster_status() { local node=$1 case "$DBMS" in "MYSQL") - local res=$(mysql -u$DBMS_ROOT_USER -p$DBMS_ROOT_PSWD \ + local command=$(mysql_command $node) + local res=$($command -u$DBMS_ROOT_USER -p$DBMS_ROOT_PSWD \ -h${NODE_INCOMING_HOST[$node]} -P${NODE_INCOMING_PORT[$node]} \ --skip-column-names -ss \ -e "SET wsrep_on=0; @@ -22,7 +34,9 @@ mysql_query() { local node=$1 local query=$2 - mysql -u$DBMS_ROOT_USER -p$DBMS_ROOT_PSWD \ + local command=$(mysql_command $node) + + $command -u$DBMS_ROOT_USER -p$DBMS_ROOT_PSWD \ -h${NODE_INCOMING_HOST[$node]} -P${NODE_INCOMING_PORT[$node]} \ --skip-column-names -ss -e "$query" }