Skip to content

Commit

Permalink
feat: add 'testing_time' limit option to dfly_bench (#4487)
Browse files Browse the repository at this point in the history
* feat: add 'testing_time' limit option to dfly_bench
---------

Signed-off-by: Roman Gershman <[email protected]>
Co-authored-by: Shahar Mike <[email protected]>
  • Loading branch information
romange and chakaz authored Jan 22, 2025
1 parent 20bc318 commit 4b8fa90
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ABSL_FLAG(uint16_t, p, 6379, "Server port");
ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread");
ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server");
ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection");
ABSL_FLAG(uint32_t, test_time, 0, "Testing time in seconds");
ABSL_FLAG(uint32_t, d, 16, "Value size in bytes ");
ABSL_FLAG(string, h, "localhost", "server hostname/ip");
ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used");
Expand Down Expand Up @@ -242,9 +243,11 @@ struct ClientStats {
// Per connection driver.
class Driver {
public:
explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), stats_(*stats) {
explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) {
socket_.reset(p->CreateSocket());
if (time_limit_ > 0)
num_reqs_ = UINT32_MAX;
}

Driver(const Driver&) = delete;
Expand All @@ -255,6 +258,8 @@ class Driver {
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);

float done() const {
if (time_limit_ > 0)
return double(absl::GetCurrentTimeNanos() - start_ns_) / (time_limit_ * 1e9);
return double(received_) / num_reqs_;
}

Expand All @@ -273,7 +278,8 @@ class Driver {
bool might_hit;
};

uint32_t num_reqs_, received_ = 0;
uint32_t num_reqs_, time_limit_, received_ = 0;
int64_t start_ns_ = 0;

ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
Expand All @@ -291,7 +297,7 @@ class TLocalClient {
explicit TLocalClient(ProactorBase* p) : p_(p) {
drivers_.resize(GetFlag(FLAGS_c));
for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_});
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
}
}

Expand Down Expand Up @@ -415,16 +421,20 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
}

void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
const int64_t start = absl::GetCurrentTimeNanos();
start_ns_ = absl::GetCurrentTimeNanos();
unsigned pipeline = GetFlag(FLAGS_pipeline);

stats_.num_clients++;

int64_t time_limit_ns =
time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();

if (now > time_limit_ns) {
break;
}
if (cycle_ns) {
int64_t target_ts = start + i * (*cycle_ns);
int64_t target_ts = start_ns_ + i * (*cycle_ns);
int64_t sleep_ns = target_ts - now;
if (reqs_.size() > 10 && sleep_ns <= 0) {
sleep_ns = 10'000;
Expand Down Expand Up @@ -468,7 +478,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {

int64_t finish = absl::GetCurrentTimeNanos();
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
<< StrFormat("%.1fs", double(finish - start) / 1000000000)
<< StrFormat("%.1fs", double(finish - start_ns_) / 1000'000'000)
<< ". Waiting for server processing";

// TODO: to change to a condvar or something.
Expand Down Expand Up @@ -662,6 +672,7 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t num_last_resp_cnt = 0;

uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n);
uint32_t time_limit = GetFlag(FLAGS_test_time);

while (*finish_signal == false) {
// we sleep with resolution of 1s but print with lower frequency to be more responsive
Expand Down Expand Up @@ -692,7 +703,8 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t total_ms = (now - start_time) / 1'000'000;
uint64_t period_ms = (now - last_print) / 1'000'000;
uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt;
double done_perc = double(stats.num_responses) * 100 / resp_goal;
double done_perc = time_limit > 0 ? double(total_ms) / (10 * time_limit)
: double(stats.num_responses) * 100 / resp_goal;
double hitrate = stats.hit_opportunities > 0
? 100 * double(stats.hit_count) / double(stats.hit_opportunities)
: 0;
Expand Down Expand Up @@ -767,10 +779,11 @@ int main(int argc, char* argv[]) {

uint32_t thread_key_step = 0;
const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = qps ? 1000000000LL / qps : 0;
const int64_t interval = qps ? 1'000'000'000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num;
uint32_t time_limit = GetFlag(FLAGS_test_time);

if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
Expand All @@ -781,9 +794,10 @@ int main(int argc, char* argv[]) {
}
}

CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall";

if (!time_limit) {
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall";
}
if (interval) {
CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps)
<< " rps per connection, i.e. request every " << interval / 1000 << "us";
Expand Down Expand Up @@ -826,7 +840,8 @@ int main(int argc, char* argv[]) {

CONSOLE_INFO << "\nTotal time: " << duration
<< ". Overall number of requests: " << summary.num_responses
<< ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan");
<< ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan")
<< ", P99 lat: " << summary.hist.Percentile(99) << "us";

if (summary.num_errors) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";
Expand Down

0 comments on commit 4b8fa90

Please sign in to comment.