From 6abb18ac4dcab026445b62196282596cc009dd8b Mon Sep 17 00:00:00 2001 From: David Naylor Date: Tue, 14 May 2019 13:50:15 -0700 Subject: [PATCH] queue: Add occupancy histogram --- bessctl/module_tests/queue_occupancy.py | 103 ++++++++++++++++++++++++ core/modules/queue.cc | 61 +++++++++++++- core/modules/queue.h | 18 ++++- protobuf/module_msg.proto | 8 +- 4 files changed, 184 insertions(+), 6 deletions(-) create mode 100644 bessctl/module_tests/queue_occupancy.py diff --git a/bessctl/module_tests/queue_occupancy.py b/bessctl/module_tests/queue_occupancy.py new file mode 100644 index 000000000..d97d4ca0a --- /dev/null +++ b/bessctl/module_tests/queue_occupancy.py @@ -0,0 +1,103 @@ +# Copyright (c) 2016-2019, Nefeli Networks, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the names of the copyright holders nor the names of their +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from test_utils import * + + +class BessQueueOccupancyTest(BessModuleTestCase): + def _send_packets(self, q): + eth = scapy.Ether(src='02:1e:67:9f:4d:ae', dst='06:16:3e:1b:72:32') + ip = scapy.IP(src='172.16.0.2', dst='8.8.8.8') + tcp = scapy.TCP(sport=52428, dport=80) + l7 = 'helloworld' + pkt = eth / ip / tcp / l7 + + pkts = [pkt] * 100 + _ = self.run_module(q, 0, pkts, [0]) + return len(pkts) + + def test_hist_enable(self): + q = Queue(size=1024, track_occupancy=True) + sent = self._send_packets(q) + resp = q.get_status() + self.assertEqual(resp.enqueued, sent) + self.assertEqual(resp.dequeued, sent) + self.assertEqual(resp.occupancy_summary.count, sent) + + def test_hist_disable(self): + q = Queue(size=1024, track_occupancy=False) + sent = self._send_packets(q) + resp = q.get_status() + self.assertEqual(resp.enqueued, sent) + self.assertEqual(resp.dequeued, sent) + self.assertEqual(resp.occupancy_summary.count, 0) + + def test_hist_size(self): + q = Queue(size=1024, track_occupancy=True) + resp = q.get_status() + self.assertEqual(resp.size, 1024) + self.assertEqual(resp.occupancy_summary.num_buckets, 32) + self.assertEqual(resp.occupancy_summary.bucket_width, 32) + + q.set_size(size=2048) + resp = q.get_status() + self.assertEqual(resp.size, 2048) + self.assertEqual(resp.occupancy_summary.num_buckets, 32) + self.assertEqual(resp.occupancy_summary.bucket_width, 64) + + q = Queue(size=1024, track_occupancy=True, occupancy_hist_buckets=64) + resp = q.get_status() + self.assertEqual(resp.size, 1024) + self.assertEqual(resp.occupancy_summary.num_buckets, 64) + self.assertEqual(resp.occupancy_summary.bucket_width, 16) + + def test_hist_summary(self): + q = Queue(size=1024, track_occupancy=True) + sent = self._send_packets(q) + + resp = q.get_status(occupancy_percentiles=[0.5, 0.9, 0.99]) + self.assertEqual(resp.occupancy_summary.count, 100) + self.assertEqual(len(resp.occupancy_summary.percentile_values), 3) + + resp = q.get_status(occupancy_percentiles=[0, 0.5, 0.9, 0.99]) + self.assertEqual(resp.occupancy_summary.count, 100) + self.assertEqual(len(resp.occupancy_summary.percentile_values), 4) + + resp = q.get_status(clear_hist=True) + self.assertEqual(resp.occupancy_summary.count, 100) + + resp = q.get_status() + self.assertEqual(resp.occupancy_summary.count, 0) + + +suite = unittest.TestLoader().loadTestsFromTestCase(BessQueueOccupancyTest) +results = unittest.TextTestRunner(verbosity=2, stream=sys.stdout).run(suite) + +if results.failures or results.errors: + sys.exit(1) diff --git a/core/modules/queue.cc b/core/modules/queue.cc index f068117b7..0e064933b 100644 --- a/core/modules/queue.cc +++ b/core/modules/queue.cc @@ -34,8 +34,6 @@ #include "../utils/format.h" -#define DEFAULT_QUEUE_SIZE 1024 - const Commands Queue::cmds = { {"set_burst", "QueueCommandSetBurstArg", MODULE_CMD_FUNC(&Queue::CommandSetBurst), Command::THREAD_SAFE}, @@ -79,6 +77,10 @@ int Queue::Resize(int slots) { queue_ = new_queue; size_ = slots; + if (track_occupancy_) { + occupancy_hist_.Resize(occupancy_buckets_, slots / occupancy_buckets_); + } + if (backpressure_) { AdjustWaterLevels(); } @@ -97,6 +99,15 @@ CommandResponse Queue::Init(const bess::pb::QueueArg &arg) { burst_ = bess::PacketBatch::kMaxBurst; + if (arg.track_occupancy()) { + track_occupancy_ = true; + occupancy_buckets_ = kDefaultBuckets; + if (arg.occupancy_hist_buckets() != 0) { + occupancy_buckets_ = arg.occupancy_hist_buckets(); + } + VLOG(1) << "Occupancy tracking enabled for " << name() << "::Queue (" << occupancy_buckets_ << " buckets)"; + } + if (arg.backpressure()) { VLOG(1) << "Backpressure enabled for " << name() << "::Queue"; backpressure_ = true; @@ -191,7 +202,19 @@ struct task_result Queue::RunTask(Context *ctx, bess::PacketBatch *batch, RunNextModule(ctx, batch); - if (backpressure_ && llring_count(queue_) < low_water_) { + uint32_t occupancy; + if (track_occupancy_ || backpressure_) { + occupancy = llring_count(queue_); + } + + if (track_occupancy_) { + mcslock_node_t mynode; + mcs_lock(&lock_, &mynode); + occupancy_hist_.Insert(occupancy); + mcs_unlock(&lock_, &mynode); + } + + if (backpressure_ && occupancy < low_water_) { SignalUnderload(); } @@ -236,16 +259,46 @@ CommandResponse Queue::CommandSetSize( } CommandResponse Queue::CommandGetStatus( - const bess::pb::QueueCommandGetStatusArg &) { + const bess::pb::QueueCommandGetStatusArg &arg) { bess::pb::QueueCommandGetStatusResponse resp; + + std::vector occupancy_percentiles; + std::copy(arg.occupancy_percentiles().begin(), arg.occupancy_percentiles().end(), + back_inserter(occupancy_percentiles)); + if (!IsValidPercentiles(occupancy_percentiles)) { + return CommandFailure(EINVAL, "invalid 'occupancy_percentiles'"); + } + const auto &occupancy_summary = occupancy_hist_.Summarize(occupancy_percentiles); + resp.set_count(llring_count(queue_)); resp.set_size(size_); resp.set_enqueued(stats_.enqueued); resp.set_dequeued(stats_.dequeued); resp.set_dropped(stats_.dropped); + SetSummary(resp.mutable_occupancy_summary(), occupancy_summary); + + if (arg.clear_hist()) { + // Note that some samples might be lost due to the small gap between + // Summarize() and the next mcs_lock... but we posit that smaller + // critical section is more important. + ClearOccupancyHist(); + } + return CommandSuccess(resp); } +void Queue::ClearOccupancyHist() { + // vector initialization is expensive thus should be out of critical section + decltype(occupancy_hist_) new_occupancy_hist(occupancy_hist_.num_buckets(), + occupancy_hist_.bucket_width()); + + // Use move semantics to minimize critical section + mcslock_node_t mynode; + mcs_lock(&lock_, &mynode); + occupancy_hist_ = std::move(new_occupancy_hist); + mcs_unlock(&lock_, &mynode); +} + void Queue::AdjustWaterLevels() { high_water_ = static_cast(size_ * kHighWaterRatio); low_water_ = static_cast(size_ * kLowWaterRatio); diff --git a/core/modules/queue.h b/core/modules/queue.h index 59f29da16..8cbb261d8 100644 --- a/core/modules/queue.h +++ b/core/modules/queue.h @@ -34,6 +34,10 @@ #include "../kmod/llring.h" #include "../module.h" #include "../pb/module_msg.pb.h" +#include "../utils/histogram.h" +#include "../utils/mcslock.h" + +#define DEFAULT_QUEUE_SIZE 1024 class Queue : public Module { public: @@ -48,7 +52,9 @@ class Queue : public Module { size_(), high_water_(), low_water_(), - stats_() { + stats_(), + track_occupancy_(), + occupancy_hist_(kDefaultBuckets, kDefaultBucketWidth) { is_task_ = true; propagate_workers_ = false; max_allowed_workers_ = Worker::kMaxWorkers; @@ -77,6 +83,8 @@ class Queue : public Module { int Resize(int slots); + void ClearOccupancyHist(); + // Readjusts the water level according to `size_`. void AdjustWaterLevels(); @@ -105,6 +113,14 @@ class Queue : public Module { uint64_t dequeued; uint64_t dropped; } stats_; + + // Queue occupancy statistics + const uint64_t kDefaultBuckets = 32; + const uint64_t kDefaultBucketWidth = DEFAULT_QUEUE_SIZE / kDefaultBuckets; + bool track_occupancy_; + uint64_t occupancy_buckets_; + Histogram occupancy_hist_; + mcslock lock_; }; #endif // BESS_MODULES_QUEUE_H_ diff --git a/protobuf/module_msg.proto b/protobuf/module_msg.proto index 5dfb7a8a6..6c39d0c04 100644 --- a/protobuf/module_msg.proto +++ b/protobuf/module_msg.proto @@ -333,7 +333,10 @@ message QueueCommandSetSizeArg { * Modules that are queues or contain queues may contain functions * `get_status()` that return QueueCommandGetStatusResponse. */ -message QueueCommandGetStatusArg {} +message QueueCommandGetStatusArg { + bool clear_hist = 1; /// if true, occupancy histogram will be all cleared after read + repeated double occupancy_percentiles = 2; /// ascending list of real numbers in [0.0, 100.0] +} /** * Modules that are queues or contain queues may contain functions @@ -346,6 +349,7 @@ message QueueCommandGetStatusResponse { uint64 enqueued = 3; /// total enqueued uint64 dequeued = 4; /// total dequeued uint64 dropped = 5; /// total dropped + HistogramSummary occupancy_summary = 6; /// Valid only if queue created with track_occupancy } /** @@ -810,6 +814,8 @@ message QueueArg { uint64 size = 1; /// The maximum number of packets to store in the queue. bool prefetch = 2; /// When prefetch is enabled, the module will perform CPU prefetch on the first 64B of each packet onto CPU L1 cache. Default value is false. bool backpressure = 3; // When backpressure is enabled, the module will notify upstream if it is overloaded. + bool track_occupancy = 4; // When occupancy tracking is enabled, the module will keep a histogram of queue occupancies (observations recorded after each dequeue). + uint64 occupancy_hist_buckets = 5; // The number of buckets to use in the histogram when occupancy tracking is enabled. } /**