diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4fb4f495e994..572fea4c2e18 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,10 +44,19 @@ jobs: build-type: [Debug, Release] compiler: [{ cxx: g++, c: gcc }] cxx_flags: ["-Werror"] + sanitizers: ["NoSanitizers"] include: - container: "alpine-dev:latest" build-type: Debug compiler: { cxx: clang++, c: clang } + cxx_flags: "" + sanitizers: "NoSanitizers" + - container: "ubuntu-dev:24" + build-type: Debug + compiler: { cxx: clang++, c: clang } + # https://maskray.me/blog/2023-08-25-clang-wunused-command-line-argument (search for compiler-rt) + cxx_flags: "-Wno-error=unused-command-line-argument" + sanitizers: "Sanitizers" runs-on: ubuntu-latest env: @@ -71,6 +80,7 @@ jobs: - uses: actions/checkout@v4 with: submodules: true + - name: Prepare Environment run: | uname -a @@ -88,6 +98,7 @@ jobs: df -h touch /mnt/foo ls -la /mnt/foo + - name: Run sccache-cache uses: mozilla-actions/sccache-action@v0.0.7 @@ -98,10 +109,35 @@ jobs: core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || '') + - name: Install clang + if: matrix.sanitizers == 'Sanitizers' + run: | + # TODO remove this once the weekly is done + apt -y update + apt -y upgrade + apt install -y clang + which clang + - name: Configure CMake # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type run: | + echo "ulimit is" + ulimit -s + echo "-----------------------------" + echo "disk space is:" + df -h + echo "-----------------------------" + + export ASAN="OFF" + export USAN="OFF" + + if [ '${{matrix.sanitizers}}' = 'Sanitizers' ]; then + echo "ASAN/USAN" + export ASAN="ON" + export USAN="ON" + fi + # -no-pie to disable address randomization so we could symbolize stacktraces cmake -B ${GITHUB_WORKSPACE}/build \ -DCMAKE_BUILD_TYPE=${{matrix.build-type}} \ @@ -110,7 +146,10 @@ jobs: -DCMAKE_CXX_COMPILER="${{matrix.compiler.cxx}}" \ -DCMAKE_CXX_COMPILER_LAUNCHER=sccache -DCMAKE_C_COMPILER_LAUNCHER=sccache \ -DCMAKE_CXX_FLAGS="${{matrix.cxx_flags}} -no-pie" -DWITH_AWS:BOOL=OFF \ + -DWITH_ASAN="${ASAN}" \ + -DWITH_USAN="${USAN}" \ -L + cd ${GITHUB_WORKSPACE}/build && pwd du -hcs _deps/ diff --git a/.github/workflows/daily-sanitizers.yml b/.github/workflows/daily-sanitizers.yml deleted file mode 100644 index 2117731714cf..000000000000 --- a/.github/workflows/daily-sanitizers.yml +++ /dev/null @@ -1,107 +0,0 @@ -name: daily-sanitizers - -on: - schedule: - - cron: '0 4 * * *' # run at 4 AM UTC - workflow_dispatch: - -jobs: - build: - runs-on: [ubuntu-24.04] - strategy: - matrix: - container: ["ubuntu-dev:24"] - build-type: [Debug] - compiler: [{ cxx: clang++, c: clang }] - # TODO bring it back when warnings on clang are fixed - # cxx_flags: ["-Werror"] - timeout-minutes: 90 - env: - SCCACHE_GHA_ENABLED: "true" - SCCACHE_CACHE_SIZE: 6G - SCCACHE_ERROR_LOG: /tmp/sccache_log.txt - - container: - image: ghcr.io/romange/${{ matrix.container }} - options: --security-opt seccomp=unconfined - credentials: - username: ${{ github.repository_owner }} - password: ${{ secrets.GITHUB_TOKEN }} - - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - - name: Run sccache-cache - uses: mozilla-actions/sccache-action@v0.0.7 - - - name: Configure Cache Env - uses: actions/github-script@v7 - with: - script: | - core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); - core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || '') - - - name: Prepare Environment - run: | - uname -a - cmake --version - mkdir -p ${GITHUB_WORKSPACE}/build - - echo "===================Before freeing up space ============================================" - df -h - rm -rf /hostroot/usr/share/dotnet - rm -rf /hostroot/usr/local/share/boost - rm -rf /hostroot/usr/local/lib/android - rm -rf /hostroot/opt/ghc - echo "===================After freeing up space ============================================" - df -h - - - name: Configure & Build - run: | - apt -y update - apt -y upgrade - apt install -y clang - which clang - echo "ulimit is" - ulimit -s - echo "-----------------------------" - echo "disk space is:" - df -h - echo "-----------------------------" - mkdir -p $GITHUB_WORKSPACE/build - cd $GITHUB_WORKSPACE/build - cmake .. \ - -DCMAKE_BUILD_TYPE=Debug \ - -GNinja \ - -DCMAKE_C_COMPILER="${{matrix.compiler.c}}" \ - -DCMAKE_CXX_COMPILER="${{matrix.compiler.cxx}}" \ - -DCMAKE_C_COMPILER_LAUNCHER=sccache \ - -DCMAKE_CXX_COMPILER_LAUNCHER=sccache \ - -DCMAKE_CXX_FLAGS="${{matrix.cxx_flags}}" \ - -DWITH_ASAN=ON \ - -DWITH_USAN=ON \ - -DCMAKE_C_FLAGS=-Wno-error=unused-command-line-argument \ - -DCMAKE_CXX_FLAGS=-Wno-error=unused-command-line-argument - # https://maskray.me/blog/2023-08-25-clang-wunused-command-line-argument (search for compiler-rt) - - ninja src/all - - - name: Test - run: | - cd $GITHUB_WORKSPACE/build - ctest -V -L DFLY - - - name: Send notifications on failure - if: failure() && github.ref == 'refs/heads/main' - shell: bash - run: | - job_link="${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}" - message="Daily sanitizers failed.\\n Job Link: ${job_link}\\n" - - curl -s \ - -X POST \ - -H 'Content-Type: application/json' \ - '${{ secrets.GSPACES_BOT_DF_BUILD }}' \ - -d '{"text": "'"${message}"'"}' diff --git a/src/core/dash.h b/src/core/dash.h index 05a99560f21a..de7a025d44d3 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -409,6 +409,13 @@ class DashTable<_Key, _Value, Policy>::Iterator { return *this; } + Iterator& AdvanceIfNotOccupied() { + if (!IsOccupied()) { + this->operator++(); + } + return *this; + } + IteratorPairType operator->() const { auto* seg = owner_->segment_[seg_id_]; return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)}; diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index b87e901b473d..d4e777010ea1 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -37,7 +37,7 @@ class ConnectionContext { // connection state / properties. bool conn_closing : 1; bool req_auth : 1; - bool replica_conn : 1; + bool replica_conn : 1; // whether it's a replica connection on the master side. bool authenticated : 1; bool async_dispatch : 1; // whether this connection is amid an async dispatch bool sync_dispatch : 1; // whether this connection is amid a sync dispatch diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index a04645cdb256..bcab6c30f2f2 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -610,6 +610,10 @@ Connection::~Connection() { UpdateLibNameVerMap(lib_name_, lib_ver_, -1); } +bool Connection::IsSending() const { + return reply_builder_ && reply_builder_->IsSendActive(); +} + // Called from Connection::Shutdown() right after socket_->Shutdown call. void Connection::OnShutdown() { VLOG(1) << "Connection::OnShutdown"; @@ -1638,9 +1642,6 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) { Connection::WeakRef Connection::Borrow() { DCHECK(self_); - // If the connection is unaware of subscriptions, it could migrate threads, making this call - // unsafe. All external mechanisms that borrow references should register subscriptions. - DCHECK_GT(cc_->subscriptions, 0); return WeakRef(self_, socket_->proactor()->GetPoolIndex(), id_); } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index f5494396babc..75fecd67fc92 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -303,6 +303,16 @@ class Connection : public util::Connection { static void TrackRequestSize(bool enable); static void EnsureMemoryBudget(unsigned tid); + unsigned idle_time() const { + return time(nullptr) - last_interaction_; + } + + Phase phase() const { + return phase_; + } + + bool IsSending() const; + protected: void OnShutdown() override; void OnPreMigrateThread() override; diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 9f133c891d0a..1e0b74202a83 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -29,8 +29,9 @@ class BlockingControllerTest : public Test { } void SetUp() override; void TearDown() override; + static void SetUpTestSuite() { - ServerState::Init(kNumThreads, kNumThreads, nullptr); + ServerState::Init(kNumThreads, kNumThreads, nullptr, nullptr); facade::tl_facade_stats = new facade::FacadeStats; } @@ -45,7 +46,7 @@ void BlockingControllerTest::SetUp() { pp_.reset(fb2::Pool::Epoll(kNumThreads)); pp_->Run(); pp_->AwaitBrief([](unsigned index, ProactorBase* p) { - ServerState::Init(index, kNumThreads, nullptr); + ServerState::Init(index, kNumThreads, nullptr, nullptr); if (facade::tl_facade_stats == nullptr) { facade::tl_facade_stats = new facade::FacadeStats; } diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 2dd4ff1c7c1e..2c16b3338060 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -30,6 +30,7 @@ ABSL_FLAG(uint16_t, p, 6379, "Server port"); ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread"); ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server"); ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection"); +ABSL_FLAG(uint32_t, test_time, 0, "Testing time in seconds"); ABSL_FLAG(uint32_t, d, 16, "Value size in bytes "); ABSL_FLAG(string, h, "localhost", "server hostname/ip"); ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used"); @@ -242,9 +243,11 @@ struct ClientStats { // Per connection driver. class Driver { public: - explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p) - : num_reqs_(num_reqs), stats_(*stats) { + explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p) + : num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) { socket_.reset(p->CreateSocket()); + if (time_limit_ > 0) + num_reqs_ = UINT32_MAX; } Driver(const Driver&) = delete; @@ -255,6 +258,8 @@ class Driver { void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); float done() const { + if (time_limit_ > 0) + return double(absl::GetCurrentTimeNanos() - start_ns_) / (time_limit_ * 1e9); return double(received_) / num_reqs_; } @@ -273,7 +278,8 @@ class Driver { bool might_hit; }; - uint32_t num_reqs_, received_ = 0; + uint32_t num_reqs_, time_limit_, received_ = 0; + int64_t start_ns_ = 0; ClientStats& stats_; unique_ptr socket_; @@ -291,7 +297,7 @@ class TLocalClient { explicit TLocalClient(ProactorBase* p) : p_(p) { drivers_.resize(GetFlag(FLAGS_c)); for (auto& driver : drivers_) { - driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_}); + driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_}); } } @@ -415,16 +421,20 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) { } void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { - const int64_t start = absl::GetCurrentTimeNanos(); + start_ns_ = absl::GetCurrentTimeNanos(); unsigned pipeline = GetFlag(FLAGS_pipeline); stats_.num_clients++; - + int64_t time_limit_ns = + time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX; for (unsigned i = 0; i < num_reqs_; ++i) { int64_t now = absl::GetCurrentTimeNanos(); + if (now > time_limit_ns) { + break; + } if (cycle_ns) { - int64_t target_ts = start + i * (*cycle_ns); + int64_t target_ts = start_ns_ + i * (*cycle_ns); int64_t sleep_ns = target_ts - now; if (reqs_.size() > 10 && sleep_ns <= 0) { sleep_ns = 10'000; @@ -468,7 +478,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { int64_t finish = absl::GetCurrentTimeNanos(); VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " - << StrFormat("%.1fs", double(finish - start) / 1000000000) + << StrFormat("%.1fs", double(finish - start_ns_) / 1000'000'000) << ". Waiting for server processing"; // TODO: to change to a condvar or something. @@ -662,6 +672,7 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) { uint64_t num_last_resp_cnt = 0; uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n); + uint32_t time_limit = GetFlag(FLAGS_test_time); while (*finish_signal == false) { // we sleep with resolution of 1s but print with lower frequency to be more responsive @@ -692,7 +703,8 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) { uint64_t total_ms = (now - start_time) / 1'000'000; uint64_t period_ms = (now - last_print) / 1'000'000; uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt; - double done_perc = double(stats.num_responses) * 100 / resp_goal; + double done_perc = time_limit > 0 ? double(total_ms) / (10 * time_limit) + : double(stats.num_responses) * 100 / resp_goal; double hitrate = stats.hit_opportunities > 0 ? 100 * double(stats.hit_count) / double(stats.hit_opportunities) : 0; @@ -767,10 +779,11 @@ int main(int argc, char* argv[]) { uint32_t thread_key_step = 0; const uint32_t qps = GetFlag(FLAGS_qps); - const int64_t interval = qps ? 1000000000LL / qps : 0; + const int64_t interval = qps ? 1'000'000'000LL / qps : 0; uint64_t num_reqs = GetFlag(FLAGS_n); uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size(); uint64_t total_requests = num_reqs * total_conn_num; + uint32_t time_limit = GetFlag(FLAGS_test_time); if (dist_type == SEQUENTIAL) { thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size()); @@ -781,9 +794,10 @@ int main(int argc, char* argv[]) { } } - CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs - << " requests per each connection, or " << total_requests << " requests overall"; - + if (!time_limit) { + CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs + << " requests per each connection, or " << total_requests << " requests overall"; + } if (interval) { CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps) << " rps per connection, i.e. request every " << interval / 1000 << "us"; @@ -826,7 +840,8 @@ int main(int argc, char* argv[]) { CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << summary.num_responses - << ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan"); + << ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan") + << ", P99 lat: " << summary.hist.Percentile(99) << "us"; if (summary.num_errors) { CONSOLE_INFO << "Got " << summary.num_errors << " error responses!"; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 7cc3e038e2e1..9dd2d9472d6c 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -231,7 +231,7 @@ void RestoreStreamer::Run() { ThisFiber::Yield(); last_yield = 0; } - } while (cursor); + } while (cursor && !fiber_cancelled_); VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString() << ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop; @@ -302,7 +302,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it - for (; !it.is_done(); ++it) { + for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) { const auto& pv = it->second; string_view key = it->first.GetSlice(&key_buffer); if (ShouldWrite(key)) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 1b2c391b527d..cc705a38c9d6 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -814,6 +814,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("migration_finalization_timeout_ms"); config_registry.RegisterMutable("table_growth_margin"); config_registry.RegisterMutable("tcp_keepalive"); + config_registry.RegisterMutable("timeout"); config_registry.RegisterMutable("managed_service_info"); config_registry.RegisterMutable( @@ -849,19 +850,23 @@ void Service::Init(util::AcceptServer* acceptor, std::vector shard_num = pp_.size(); } + // We assume that listeners.front() is the main_listener + // see dfly_main RunEngine. In unit tests, listeners are empty. + facade::Listener* main_listener = listeners.empty() ? nullptr : listeners.front(); + ChannelStore* cs = new ChannelStore{}; // Must initialize before the shard_set because EngineShard::Init references ServerState. pp_.AwaitBrief([&](uint32_t index, ProactorBase* pb) { tl_facade_stats = new FacadeStats; - ServerState::Init(index, shard_num, &user_registry_); + ServerState::Init(index, shard_num, main_listener, &user_registry_); ServerState::tlocal()->UpdateChannelStore(cs); }); const auto tcp_disabled = GetFlag(FLAGS_port) == 0u; // We assume that listeners.front() is the main_listener // see dfly_main RunEngine - if (!tcp_disabled && !listeners.empty()) { - acl_family_.Init(listeners.front(), &user_registry_); + if (!tcp_disabled && main_listener) { + acl_family_.Init(main_listener, &user_registry_); } // Initialize shard_set with a callback running once in a while in the shard threads. @@ -907,7 +912,7 @@ void Service::Shutdown() { shard_set->Shutdown(); Transaction::Shutdown(); - pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); }); + pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); }); // wait for all the pending callbacks to stop. ThisFiber::SleepFor(10ms); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8d78de1b212b..00095e10db98 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2290,6 +2290,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries); append("send_delay_ms", GetDelayMs(m.oldest_pending_send_ts)); + append("timeout_disconnects", m.coordinator_stats.conn_timeout_events); } if (should_enter("MEMORY")) { diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 9bdedc1a7d28..11554d66598a 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -15,19 +15,26 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "facade/conn_context.h" +#include "facade/dragonfly_connection.h" #include "server/journal/journal.h" +#include "util/listener_interface.h" ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread"); +ABSL_FLAG(uint32_t, timeout, 0, + "Close the connection after it is idle for N seconds (0 to disable)"); namespace dfly { +using namespace std; +using namespace std::chrono_literals; + __thread ServerState* ServerState::state_ = nullptr; ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 19 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 20 * 8, "Stats size mismatch"); #define ADD(x) this->x += (other.x) @@ -54,7 +61,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { ADD(compressed_blobs); ADD(oom_error_cmd_cnt); - + ADD(conn_timeout_events); if (this->tx_width_freq_arr.size() > 0) { DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size()); this->tx_width_freq_arr += other.tx_width_freq_arr; @@ -102,14 +109,21 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe } ServerState::~ServerState() { + watcher_fiber_.JoinIfNeeded(); } -void ServerState::Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry) { +void ServerState::Init(uint32_t thread_index, uint32_t num_shards, + util::ListenerInterface* main_listener, acl::UserRegistry* registry) { state_ = new ServerState(); state_->gstate_ = GlobalState::ACTIVE; state_->thread_index_ = thread_index; state_->user_registry = registry; state_->stats = Stats(num_shards); + if (main_listener) { + state_->watcher_fiber_ = util::fb2::Fiber( + util::fb2::Launch::post, "ConnectionsWatcher", + [state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); }); + } } void ServerState::Destroy() { @@ -117,6 +131,11 @@ void ServerState::Destroy() { state_ = nullptr; } +void ServerState::EnterLameDuck() { + gstate_ = GlobalState::SHUTTING_DOWN; + watcher_cv_.notify_all(); +} + ServerState::MemoryUsageStats ServerState::GetMemoryUsage(uint64_t now_ns) { static constexpr uint64_t kCacheEveryNs = 1000; if (now_ns > used_mem_last_update_ + kCacheEveryNs) { @@ -208,4 +227,62 @@ ServerState* ServerState::SafeTLocal() { bool ServerState::ShouldLogSlowCmd(unsigned latency_usec) const { return slow_log_shard_.IsEnabled() && latency_usec >= log_slower_than_usec; } + +void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) { + optional last_reference; + + while (true) { + util::fb2::NoOpLock noop; + if (watcher_cv_.wait_for(noop, 1s, [this] { return gstate_ == GlobalState::SHUTTING_DOWN; })) { + break; + } + + uint32_t timeout = absl::GetFlag(FLAGS_timeout); + if (timeout == 0) { + continue; + } + + facade::Connection* from = nullptr; + if (last_reference && !last_reference->IsExpired()) { + from = last_reference->Get(); + } + + // We use weak refs, because ShutdownSelf below can potentially block the fiber, + // and during this time some of the connections might be destroyed. Weak refs allow checking + // validity of each connection. + vector conn_refs; + + auto cb = [&](unsigned thread_index, util::Connection* conn) { + facade::Connection* dfly_conn = static_cast(conn); + using Phase = facade::Connection::Phase; + auto phase = dfly_conn->phase(); + bool is_replica = true; + if (dfly_conn->cntx()) { + is_replica = dfly_conn->cntx()->replica_conn; + } + + if ((phase == Phase::READ_SOCKET || dfly_conn->IsSending()) && + !is_replica && dfly_conn->idle_time() > timeout) { + conn_refs.push_back(dfly_conn->Borrow()); + } + }; + + util::Connection* next = main->TraverseConnectionsOnThread(cb, 100, from); + if (next) { + last_reference = static_cast(next)->Borrow(); + } else { + last_reference.reset(); + } + + for (auto& ref : conn_refs) { + facade::Connection* conn = ref.Get(); + if (conn) { + VLOG(1) << "Closing connection due to timeout: " << conn->GetClientInfo(); + conn->ShutdownSelf(); + stats.conn_timeout_events++; + } + } + } +} + } // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index 6ea43787f48f..6d77e759d08b 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -23,6 +23,10 @@ namespace facade { class Connection; } +namespace util { +class ListenerInterface; +} + namespace dfly { namespace journal { @@ -127,6 +131,7 @@ class ServerState { // public struct - to allow initialization. // Number of times we rejected command dispatch due to OOM condition. uint64_t oom_error_cmd_cnt = 0; + uint32_t conn_timeout_events = 0; std::valarray tx_width_freq_arr; }; @@ -150,12 +155,11 @@ class ServerState { // public struct - to allow initialization. ServerState(); ~ServerState(); - static void Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry); + static void Init(uint32_t thread_index, uint32_t num_shards, + util::ListenerInterface* main_listener, acl::UserRegistry* registry); static void Destroy(); - void EnterLameDuck() { - state_->gstate_ = GlobalState::SHUTTING_DOWN; - } + void EnterLameDuck(); void TxCountInc() { ++live_transactions_; @@ -302,6 +306,9 @@ class ServerState { // public struct - to allow initialization. size_t serialization_max_chunk_size; private: + // A fiber constantly watching connections on the main listener. + void ConnectionsWatcherFb(util::ListenerInterface* main); + int64_t live_transactions_ = 0; SlowLogShard slow_log_shard_; mi_heap_t* data_heap_; @@ -321,6 +328,10 @@ class ServerState { // public struct - to allow initialization. int client_pauses_[2] = {}; util::fb2::EventCount client_pause_ec_; + // Monitors connections. Currently responsible for closing timed out connections. + util::fb2::Fiber watcher_fiber_; + util::fb2::CondVarAny watcher_cv_; + using Counter = util::SlidingCounter<7>; Counter qps_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 4c2abbe51a23..c6c64261abb9 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -291,11 +291,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite it.SetVersion(snapshot_version_); unsigned result = 0; - while (!it.is_done()) { + for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) { ++result; // might preempt due to big value serialization. SerializeEntry(db_index, it->first, it->second); - ++it; } serialize_bucket_running_ = false; return result; diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index e284f3f484c7..802c4265f2b1 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -257,15 +257,16 @@ bool ParseID(string_view strid, bool strict, uint64_t missing_seq, ParsedStreamI return true; } -bool ParseRangeId(string_view id, RangeId* dest) { +enum class RangeBoundary { kStart, kEnd }; +bool ParseRangeId(string_view id, RangeBoundary type, RangeId* dest) { if (id.empty()) return false; if (id[0] == '(') { dest->exclude = true; id.remove_prefix(1); } - - return ParseID(id, dest->exclude, 0, &dest->parsed_id); + uint64 missing_seq = type == RangeBoundary::kStart ? 0 : -1; + return ParseID(id, dest->exclude, missing_seq, &dest->parsed_id); } /* This is a wrapper function for lpGet() to directly get an integer value @@ -2206,7 +2207,8 @@ void XRangeGeneric(std::string_view key, std::string_view start, std::string_vie CmdArgList args, bool is_rev, Transaction* tx, SinkReplyBuilder* builder) { RangeOpts range_opts; RangeId rs, re; - if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { + if (!ParseRangeId(start, RangeBoundary::kStart, &rs) || + !ParseRangeId(end, RangeBoundary::kEnd, &re)) { return builder->SendError(kInvalidStreamId, kSyntaxErrType); } @@ -2504,7 +2506,8 @@ bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* string_view start = ArgS(args, id_indx); id_indx++; string_view end = ArgS(args, id_indx); - if (!ParseRangeId(start, &rs) || !ParseRangeId(end, &re)) { + if (!ParseRangeId(start, RangeBoundary::kStart, &rs) || + !ParseRangeId(end, RangeBoundary::kEnd, &re)) { builder->SendError(kInvalidStreamId, kSyntaxErrType); return false; } @@ -3227,7 +3230,7 @@ void StreamFamily::XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx) { string_view start = ArgS(args, 4); RangeId rs; - if (!ParseRangeId(start, &rs)) { + if (!ParseRangeId(start, RangeBoundary::kStart, &rs)) { return rb->SendError(kSyntaxErr); } diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 0b5ae10cbb42..12edae158806 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -93,6 +93,21 @@ TEST_F(StreamFamilyTest, AddExtended) { EXPECT_THAT(Run({"xlen", "key4"}), IntArg(601)); } +TEST_F(StreamFamilyTest, XrangeRangeAutocomplete) { + Run({"xadd", "mystream", "1609459200000-0", "0", "0"}); + Run({"xadd", "mystream", "1609459200001-0", "1", "1"}); + Run({"xadd", "mystream", "1609459200001-1", "2", "2"}); + Run({"xadd", "mystream", "1609459200002-0", "3", "3"}); + auto resp = Run({"xrange", "mystream", "1609459200000", "1609459200001"}); + EXPECT_THAT(resp, RespElementsAre(RespElementsAre("1609459200000-0", RespElementsAre("0", "0")), + RespElementsAre("1609459200001-0", RespElementsAre("1", "1")), + RespElementsAre("1609459200001-1", RespElementsAre("2", "2")))); + resp = Run({"xrange", "mystream", "1609459200000", "(1609459200001"}); + EXPECT_THAT(resp, RespElementsAre(RespElementsAre("1609459200000-0", RespElementsAre("0", "0")), + RespElementsAre("1609459200001-0", RespElementsAre("1", "1")), + RespElementsAre("1609459200001-1", RespElementsAre("2", "2")))); +} + TEST_F(StreamFamilyTest, Range) { Run({"xadd", "key", "1-*", "f1", "v1"}); Run({"xadd", "key", "1-*", "f2", "v2"}); diff --git a/tests/dragonfly/acl_family_test.py b/tests/dragonfly/acl_family_test.py index f9d4b3003a0e..393174fbe333 100644 --- a/tests/dragonfly/acl_family_test.py +++ b/tests/dragonfly/acl_family_test.py @@ -190,12 +190,18 @@ async def test_acl_cat_commands_multi_exec_squash(df_factory): res = await admin_client.execute_command("ACL SETUSER kk -@string") assert res == "OK" + # We need to sleep because within dragonfly, we first reply to the client with + # "OK" and then we stream the update to proactor threads. The reason for this, + # are some connections might need to be evicted, so we first need to reply before + # we actually do that. Between those steps, there is a small window that the + # EXEC below might succeed. + await asyncio.sleep(1) + res = await client.execute_command("EXEC") # TODO(we need to fix this, basiscally SQUASHED/MULTI transaction commands # return multiple errors for each command failed. Since the nature of the error # is the same, that a rule has changed we should squash those error messages into # one. - logging.debug(f"Result is: {res}") assert res[0].args[0] == "kk ACL rules changed between the MULTI and EXEC", res await admin_client.aclose() diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 9f0cbbd46ec2..d2be391c6c0a 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2685,3 +2685,81 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) assert (await StaticSeeder.capture(nodes[1].client)) == start_capture + + +""" +Test cluster node distributing its slots into 2 other nodes. +In this test we start migrating to the second node only after the first one finished to +reproduce the bug found in issue #4455 +""" + + +@pytest.mark.exclude_epoll +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_seeder_factory): + # 1. Create cluster of 3 nodes with all slots allocated to first node. + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2", + ) + for i in range(3) + ] + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + nodes[2].slots = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("DEBUG POPULATE first node") + key_num = 100000 + await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client) + dbsize_node0 = await nodes[0].client.dbsize() + assert dbsize_node0 > (key_num * 0.95) + + # 2. Start migrating part of the slots from first node to second + logging.debug("Start first migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16300)], nodes[1].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 3. Wait for migratin finish + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=50) + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", timeout=50) + + nodes[0].migrations = [] + nodes[0].slots = [(16301, 16383)] + nodes[1].slots = [(0, 16300)] + nodes[2].slots = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 4. Start migrating remaind slots from first node to third node + logging.debug("Start second migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [(16301, 16383)], nodes[2].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 5. Wait for migratin finish + await wait_for_status(nodes[0].admin_client, nodes[2].id, "FINISHED", timeout=10) + await wait_for_status(nodes[2].admin_client, nodes[0].id, "FINISHED", timeout=10) + + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16300)] + nodes[2].slots = [(16301, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 6. Check all data was migrated + # Using dbsize to check all the data was migrated to the other nodes. + # Note: we can not use the seeder capture as we migrate the data to 2 different nodes. + # TODO: improve the migration conrrectness by running the seeder capture on slot range (requiers changes in capture script). + dbsize_node1 = await nodes[1].client.dbsize() + dbsize_node2 = await nodes[2].client.dbsize() + assert dbsize_node1 + dbsize_node2 == dbsize_node0 + assert dbsize_node2 > 0 and dbsize_node1 > 0 diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index e7d74ad3d5e1..c6dd949c6d69 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -1051,3 +1051,23 @@ async def test_hiredis(df_factory): server.start() client = base_redis.Redis(port=server.port, protocol=3, cache_config=CacheConfig()) client.ping() + + +@dfly_args({"timeout": 1}) +async def test_timeout(df_server: DflyInstance, async_client: aioredis.Redis): + another_client = df_server.client() + await another_client.ping() + clients = await async_client.client_list() + assert len(clients) == 2 + + await asyncio.sleep(2) + + @assert_eventually + async def wait_for_conn_drop(): + clients = await async_client.client_list() + logging.info("clients: %s", clients) + assert len(clients) <= 1 + + await wait_for_conn_drop() + info = await async_client.info("clients") + assert int(info["timeout_disconnects"]) >= 1 \ No newline at end of file