Skip to content

Commit

Permalink
remove expire mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Nov 13, 2019
1 parent 160478c commit f1104ac
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 46 deletions.
1 change: 0 additions & 1 deletion config.default.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"server_name" : "brubeck_debug",
"dumpfile" : "./brubeck.dump",
"capacity" : 15,
"expire" : 20,
"http" : ":8080",

"backends" : [
Expand Down
2 changes: 1 addition & 1 deletion src/backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static void *backend__thread(void *_ptr)
self->tick_time = now.tv_sec;

for (mt = self->queue; mt; mt = mt->next) {
if (mt->expire > BRUBECK_EXPIRE_DISABLED)
if (mt->state != BRUBECK_STATE_INACTIVE)
brubeck_metric_sample(mt, self->sample, self);
}

Expand Down
8 changes: 4 additions & 4 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ expire_metric(struct brubeck_server *server, const char *url)
server, url + strlen("/expire/"));

if (metric) {
metric->expire = BRUBECK_EXPIRE_DISABLED;
metric->state = BRUBECK_STATE_INACTIVE;
return MHD_create_response_from_data(
0, "", 0, 0);
}
Expand All @@ -76,8 +76,8 @@ send_metric(struct brubeck_server *server, const char *url)
static const char *metric_types[] = {
"gauge", "meter", "counter", "histogram", "timer", "internal"
};
static const char *expire_status[] = {
"disabled", "inactive", "active"
static const char *metric_status[] = {
"inactive", "active"
};

struct brubeck_metric *metric = safe_lookup_metric(
Expand All @@ -92,7 +92,7 @@ send_metric(struct brubeck_server *server, const char *url)
#else
"shard", 0,
#endif
"expire", expire_status[metric->expire]
"status", metric_status[metric->state]
);

char *jsonr = json_dumps(mj, JSON_INDENT(4) | JSON_PRESERVE_ORDER);
Expand Down
2 changes: 1 addition & 1 deletion src/internal_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
* Mark the metric as active so it doesn't get disabled
* by the inactive metrics pruner
*/
metric->expire = BRUBECK_EXPIRE_ACTIVE;
metric->state = BRUBECK_STATE_ACTIVE;
}

void brubeck_internal__init(struct brubeck_server *server)
Expand Down
16 changes: 9 additions & 7 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ new_metric(struct brubeck_server *server, const char *key, size_t key_len, uint8
metric->key[key_len] = '\0';
metric->key_len = (uint16_t)key_len;

metric->expire = BRUBECK_EXPIRE_ACTIVE;
metric->state = BRUBECK_STATE_ACTIVE;
metric->type = type;
pthread_spin_init(&metric->lock, PTHREAD_PROCESS_PRIVATE);

Expand Down Expand Up @@ -49,6 +49,7 @@ gauge__record(struct brubeck_metric *metric, value_t value, value_t sample_freq,
} else {
metric->as.gauge.value = value;
}
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -61,6 +62,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
pthread_spin_lock(&metric->lock);
{
value = metric->as.gauge.value;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -82,6 +84,7 @@ meter__record(struct brubeck_metric *metric, value_t value, value_t sample_freq,
pthread_spin_lock(&metric->lock);
{
metric->as.meter.value += value;
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -95,6 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
{
value = metric->as.meter.value;
metric->as.meter.value = 0.0;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand Down Expand Up @@ -124,6 +128,7 @@ counter__record(struct brubeck_metric *metric, value_t value, value_t sample_fre
}

metric->as.counter.previous = value;
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -137,6 +142,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
{
value = metric->as.counter.value;
metric->as.counter.value = 0.0;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -155,6 +161,7 @@ histogram__record(struct brubeck_metric *metric, value_t value, value_t sample_f
pthread_spin_lock(&metric->lock);
{
brubeck_histo_push(&metric->as.histogram, value, sample_freq);
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -168,6 +175,7 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
pthread_spin_lock(&metric->lock);
{
brubeck_histo_sample(&hsample, &metric->as.histogram);
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -187,11 +195,6 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
sample(metric->type, key, hsample.count / (double)backend->sample_freq, opaque);
}

/* if there have been no metrics during this sampling period,
* we don't need to report any of the histogram samples */
if (hsample.count == 0.0)
return;

WITH_SUFFIX(".min") {
sample(metric->type, key, hsample.min, opaque);
}
Expand Down Expand Up @@ -333,6 +336,5 @@ brubeck_metric_find(struct brubeck_server *server, const char *key, size_t key_l
brubeck_atomic_inc(&metric->flow);
#endif

metric->expire = BRUBECK_EXPIRE_ACTIVE;
return metric;
}
7 changes: 3 additions & 4 deletions src/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ enum brubeck_aggregate_t {
};

enum {
BRUBECK_EXPIRE_DISABLED = 0,
BRUBECK_EXPIRE_INACTIVE = 1,
BRUBECK_EXPIRE_ACTIVE = 2
BRUBECK_STATE_INACTIVE = 0,
BRUBECK_STATE_ACTIVE = 1
};

struct brubeck_metric {
Expand All @@ -38,7 +37,7 @@ struct brubeck_metric {
pthread_spinlock_t lock;
uint16_t key_len;
uint8_t type;
uint8_t expire;
uint8_t state;

union {
struct {
Expand Down
30 changes: 3 additions & 27 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ update_proctitle(struct brubeck_server *server)
setproctitle("brubeck", buf);
}

static void
expire_metric(struct brubeck_metric *mt, void *_)
{
/* If this metric is not disabled, turn "inactive"
* into "disabled" and "active" into "inactive"
*/
if (mt->expire > BRUBECK_EXPIRE_DISABLED)
mt->expire = mt->expire - 1;
}

static void
dump_metric(struct brubeck_metric *mt, void *out_file)
{
Expand Down Expand Up @@ -192,7 +182,6 @@ static void load_config(struct brubeck_server *server, const char *path)
json_t *backends, *samplers;

/* optional */
int expire = 0;
char *http = NULL;

server->name = "brubeck";
Expand All @@ -205,14 +194,13 @@ static void load_config(struct brubeck_server *server, const char *path)
}

json_unpack_or_die(server->config,
"{s?:s, s:s, s:i, s:o, s:o, s?:s, s?:i}",
"{s?:s, s:s, s:i, s:o, s:o, s?:s}",
"server_name", &server->name,
"dumpfile", &server->dump_path,
"capacity", &capacity,
"backends", &backends,
"samplers", &samplers,
"http", &http,
"expire", &expire);
"http", &http);

gh_log_set_instance(server->name);

Expand All @@ -224,7 +212,6 @@ static void load_config(struct brubeck_server *server, const char *path)
load_samplers(server, samplers);

if (http) brubeck_http_endpoint_init(server, http);
if (expire) server->fd_expire = load_timerfd(expire);
}

void brubeck_server_init(struct brubeck_server *server, const char *config)
Expand All @@ -237,7 +224,6 @@ void brubeck_server_init(struct brubeck_server *server, const char *config)

server->fd_signal = load_signalfd();
server->fd_update = load_timerfd(1);
server->fd_expire = -1;

/* init the memory allocator */
brubeck_slab_init(&server->slab);
Expand Down Expand Up @@ -272,7 +258,7 @@ static int signal_triggered(struct pollfd *fd)

int brubeck_server_run(struct brubeck_server *server)
{
struct pollfd fds[3];
struct pollfd fds[2];
int nfd = 2;
size_t i;

Expand All @@ -284,11 +270,6 @@ int brubeck_server_run(struct brubeck_server *server)
fds[1].fd = server->fd_update;
fds[1].events = POLLIN;

if (server->fd_expire >= 0) {
fds[2].fd = server->fd_expire;
fds[2].events = POLLIN;
nfd++;
}

server->running = 1;
log_splunk("event=listening");
Expand All @@ -315,11 +296,6 @@ int brubeck_server_run(struct brubeck_server *server)
update_flows(server);
update_proctitle(server);
}

if (timer_elapsed(&fds[2])) {
log_splunk("event=expire_metrics");
brubeck_hashtable_foreach(server->metrics, &expire_metric, NULL);
}
}

for (i = 0; i < server->active_backends; ++i)
Expand Down
1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct brubeck_server {
int active_samplers;

int fd_signal;
int fd_expire;
int fd_update;

struct brubeck_slab slab;
Expand Down

0 comments on commit f1104ac

Please sign in to comment.