Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution metrics #39

Open
wants to merge 91 commits into
base: project-bkjg
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
5670907
Add distribution header file
bkjg Jul 23, 2020
e060f37
Add draft of distribution functions
bkjg Jul 23, 2020
25be9e0
Add implementation of distribution_new_linear function
bkjg Jul 23, 2020
0144c2f
Add implementation of bucket_new_linear function
bkjg Jul 23, 2020
d14f5b9
Remove one argument from distribution_new_exponential function
bkjg Jul 23, 2020
3721e29
Add implementation of distribution_new_exponential function
bkjg Jul 23, 2020
8922e23
Add implementation of bucket_new_exponential function
bkjg Jul 23, 2020
7ac8dfd
Add implementation of distribution_new_custom function
bkjg Jul 23, 2020
4c155e5
Add implementation of distribution_destroy function
bkjg Jul 23, 2020
7578c49
Add implementation of bucket_new_custom function
bkjg Jul 23, 2020
27bdc61
Fix bugs
bkjg Jul 24, 2020
009a5bb
Add implementation of distribution_update function
bkjg Jul 24, 2020
ae32597
Add implementation of distribution_percentile function
bkjg Jul 24, 2020
40bc6eb
Add implementation of distribution_average function
bkjg Jul 24, 2020
bba200a
Add mutex to distribution_update function
bkjg Jul 24, 2020
18fe3a4
Add implementation of distribution_clone function
bkjg Jul 24, 2020
6108428
Change the type returned by distribution_clone function
bkjg Jul 24, 2020
618c944
Add cheking null pointeres
bkjg Jul 27, 2020
52c8e49
Make the code thread safe
bkjg Jul 27, 2020
7169ca3
Little pointers fixes
bkjg Jul 27, 2020
b8a766b
Add initial_size to distribution_h structure
bkjg Jul 27, 2020
2fb1e01
Clang format
bkjg Jul 27, 2020
fc5e6e6
Add initialization and destruction of mutex
bkjg Jul 27, 2020
6c91ed6
Fix potential attempt to read a null pointer
bkjg Jul 27, 2020
43cdba6
Add distribution metrics to the build system
bkjg Jul 28, 2020
d080a28
Replace copying distribution metrics by mutex
bkjg Jul 28, 2020
0e7ff74
Changed custom_bucket_sizes to custom_bucket_boundaries
bkjg Jul 28, 2020
200cd28
Extend the comments in distribution.h file
bkjg Jul 28, 2020
791f9bf
Change the values returned from percentile and average functions
bkjg Jul 28, 2020
980aa9d
Add license
bkjg Jul 28, 2020
5157345
Fix email in the license
bkjg Jul 28, 2020
13cfe62
Add checking the correctness of the boundaries
bkjg Jul 28, 2020
c2aa581
Rename num_buckets to num_boundaries for custom boundaries
bkjg Jul 28, 2020
37d5df8
Fix seg fault in distribution initializating functions
bkjg Jul 29, 2020
4b05e22
Fix the seg fault in distribution_new_custom function
bkjg Jul 29, 2020
013e65b
Add function for comparision of distributions
bkjg Jul 30, 2020
48617a1
Remove casting pointers after calloc
bkjg Jul 30, 2020
93d8a7a
Add casting to uint64_t in distribution_percentile function
bkjg Jul 30, 2020
8ed4657
Rename ptr to idx in bucket_update function
bkjg Jul 30, 2020
959f65e
Add comment in distribution.h file
bkjg Jul 30, 2020
4f9777e
Add condition to distribution_percentile function
bkjg Jul 30, 2020
3897da0
Fix binary search algorithm
bkjg Jul 30, 2020
e3982c3
Add distribution tests to the build system
bkjg Jul 29, 2020
c32146e
Add first distribution test - testing distribution_new_linear
bkjg Jul 29, 2020
66b81a0
Add unit tests for distribution_new_linear function
bkjg Jul 29, 2020
02b61bb
Add unit tests for the distribution_new_exponential function
bkjg Jul 29, 2020
923a41e
Start implementing tests form distribution_new_custom function
bkjg Jul 29, 2020
5c2551a
Add unit tests for the distribution_new_custom function
bkjg Jul 29, 2020
f603aa8
Add checking for not null in tests
bkjg Jul 30, 2020
b6fb8fd
Delete invalid tests
bkjg Jul 30, 2020
e5ad13b
Start implementing tests for distribution_clone function
bkjg Jul 30, 2020
e883c54
Add unit tests for the distribution_clone function
bkjg Jul 30, 2020
93cd883
Add unit tests for the distribution_average function
bkjg Jul 30, 2020
2770ba9
Add draft of unit tests for distribution_percentile function
bkjg Jul 30, 2020
2de7767
Add another unit tests for distribution_percentile function
bkjg Jul 30, 2020
faff4db
Add TODO
Jul 30, 2020
0caa54b
Implement getters functions
bkjg Jul 31, 2020
f8ce011
Move implementation of distribution_new_linear test to use getters
bkjg Jul 31, 2020
f714545
Move implementation of distribution_new_exponential and distribution_…
bkjg Jul 31, 2020
3b52335
Change signature of distribution_update function to int
bkjg Jul 31, 2020
36177a9
Fix heap buffer overflow error in bucket_update function
bkjg Aug 3, 2020
a568b19
Fix problems with getters and rename initial_size to base
bkjg Aug 4, 2020
3950d11
Implement tests for distribution_update and distribution_get_num_buckets
bkjg Aug 4, 2020
567ca20
Merge remote-tracking branch 'origin/distribution-metrics' into distr…
bkjg Aug 4, 2020
6e94531
Implement tests for distribution_get_buckets_boundaries
bkjg Aug 4, 2020
61caf83
Fix bugs in distribution_get_buckets_boundaries function
bkjg Aug 4, 2020
6d4afb2
Implement tests for distribution_get_buckets_counters function
bkjg Aug 4, 2020
fcbe7f0
Implement tests for distribution_get_sum_gauges and distribution_chec…
bkjg Aug 4, 2020
557dbd3
Add checking errno inside unit tests
bkjg Aug 4, 2020
5ed12d8
Add condition to distribution_new_custom function
bkjg Aug 4, 2020
8abc56e
Clang format
bkjg Aug 4, 2020
0fb5f5d
Replace constant number with static const variable
bkjg Aug 4, 2020
4e2fedd
Remove distribution.c include in distribution_test.c file
bkjg Aug 4, 2020
15c47fd
Add getters to the distribution header file
bkjg Aug 4, 2020
44991a3
Remove creating libdistribution.la from Makefile.am
bkjg Aug 5, 2020
e0246de
Add distribution_benchmark to Makefile.am
bkjg Aug 5, 2020
350e9f5
Add benchmark
bkjg Aug 10, 2020
50cc412
Add bash script for running distribution_benchmark
bkjg Aug 10, 2020
286f762
Clang format
bkjg Aug 10, 2020
b8a3350
Little fixes in benchmark and in functions for the distribution_t str…
bkjg Aug 12, 2020
5c162bd
Add distribution_benchmark for the different types of distribution
bkjg Aug 12, 2020
06c542e
Change number of iterations in the benchmark
bkjg Aug 12, 2020
8e5f95b
Change steps in script for running benchmark
bkjg Aug 13, 2020
06a62fd
Started introducing checking nulls in tests
bkjg Aug 13, 2020
17385fa
Move checking the custom buckets boundaries before allocating buckets
bkjg Aug 13, 2020
cb6c55d
Add comments about buckets' boundaries
bkjg Aug 13, 2020
e77c4bd
Add comments to the header file
bkjg Aug 13, 2020
7e3ad57
Fix returning NAN in distribution_average if there was no updates
bkjg Aug 13, 2020
9608d1b
Add checking nulls to the distribution_test
bkjg Aug 13, 2020
16af7af
Add two corner cases to tests for distribution_new_exponential function
bkjg Aug 13, 2020
2926727
Add unlocking mutexes in case on failure in distribution_check_equal …
bkjg Aug 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ libmetadata_la_SOURCES = \
src/utils/metadata/meta_data.h

libmetric_la_SOURCES = \
src/daemon/distribution.c \
src/daemon/distribution.h \
src/daemon/metric.c \
src/daemon/metric.h
libmetric_la_LIBADD = libmetadata.la $(COMMON_LIBS)
Expand Down
291 changes: 291 additions & 0 deletions src/daemon/distribution.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
/**
* collectd - src/daemon/distribution.c
* Copyright (C) 2020 Google LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* Authors:
* Barbara bkjg Kaczorowska <bkjg at google.com>
*/

#include "distribution.h"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first include here should be "collectd.h". That should pull in <math.h> and <pthread.h> already.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it doesn't work for me :( And if I replace include <math.h> and include <pthread.h> with include "collectd.h" it doesn't compile in my case

#include <math.h>
#include <pthread.h>

typedef struct {
double max_boundary;
uint64_t counter;
} bucket_t;
Comment on lines +35 to +38
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a comment about how does the buckets look like. What are the minimal boundaries? Are minimal and maximal boundaries inclusive or exclusive?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I will add that. Thanks!


struct distribution_s {
bucket_t *buckets;
double sum_gauges;
size_t num_buckets;
pthread_mutex_t mutex;
};

static bucket_t *bucket_new_linear(size_t num_buckets, double size) {
bucket_t *buckets = (bucket_t *)calloc(num_buckets, sizeof(bucket_t));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At your option: calloc returns a void * which can implicitly be cast to any other pointer type. In other words, the (explicit) cast is not needed. I tend to omit the (explicit) cast to make the line easier to read.

Here and below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, thanks!


if (buckets == NULL) {
return NULL;
}

for (size_t i = 0; i < num_buckets - 1; ++i) {
buckets[i].max_boundary = (double)(i + 1) * size;
}

buckets[num_buckets - 1].max_boundary = INFINITY;

return buckets;
}

static bucket_t *bucket_new_exponential(size_t num_buckets, double initial_size,
double factor) {
bucket_t *buckets = (bucket_t *)calloc(num_buckets, sizeof(bucket_t));

if (buckets == NULL) {
return NULL;
}

double multiplier = initial_size;

for (size_t i = 0; i < num_buckets - 1; ++i) {
buckets[i].max_boundary = factor * multiplier;
multiplier *= initial_size;
}

buckets[num_buckets - 1].max_boundary = INFINITY;

return buckets;
}

static bucket_t *bucket_new_custom(size_t num_boundaries,
const double *custom_buckets_boundaries) {
bucket_t *buckets = (bucket_t *)calloc(num_boundaries + 1, sizeof(bucket_t));

if (buckets == NULL) {
return NULL;
}

if (custom_buckets_boundaries[0] <= 0) {
free(buckets);
errno = EINVAL;
return NULL;
}

buckets[0].max_boundary = custom_buckets_boundaries[0];

for (size_t i = 1; i < num_boundaries; ++i) {
if (custom_buckets_boundaries[i] <= 0 ||
custom_buckets_boundaries[i - 1] >= custom_buckets_boundaries[i]) {
free(buckets);
errno = EINVAL;
return NULL;
}

buckets[i].max_boundary = custom_buckets_boundaries[i];
}

buckets[num_boundaries].max_boundary = INFINITY;

return buckets;
}

distribution_t *distribution_new_linear(size_t num_buckets, double size) {
octo marked this conversation as resolved.
Show resolved Hide resolved
distribution_t *d = (distribution_t *)calloc(1, sizeof(distribution_t));

if (d == NULL) {
return NULL;
}

d->buckets = bucket_new_linear(num_buckets, size);

if (d->buckets == NULL) {
free(d);
return NULL;
}

d->num_buckets = num_buckets;
pthread_mutex_init(&d->mutex, NULL);
Copy link

@emargalit emargalit Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens on line 228?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initialise the mutex. When you create it, you should initialise it, to be sure that it will work correctly.

https://linux.die.net/man/3/pthread_mutex_init


return d;
}

distribution_t *distribution_new_exponential(size_t num_buckets,
double initial_size,
double factor) {
distribution_t *d = (distribution_t *)calloc(1, sizeof(distribution_t));

if (d == NULL) {
return NULL;
}

d->buckets = bucket_new_exponential(num_buckets, initial_size, factor);

if (d->buckets == NULL) {
free(d);
return NULL;
}

d->num_buckets = num_buckets;
pthread_mutex_init(&d->mutex, NULL);

return d;
}

distribution_t *distribution_new_custom(size_t num_boundaries,
double *custom_buckets_boundaries) {
distribution_t *d = (distribution_t *)calloc(1, sizeof(distribution_t));

if (d == NULL) {
return NULL;
}

d->buckets = bucket_new_custom(num_boundaries, custom_buckets_boundaries);

if (d->buckets == NULL) {
free(d);
return NULL;
}

d->num_buckets = num_boundaries;
pthread_mutex_init(&d->mutex, NULL);

return d;
}

static void bucket_update(bucket_t *buckets, size_t num_buckets, double gauge) {
int ptr = (int)num_buckets - 1;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ptr" sounds like "pointer" but is actually an integer. How about "index" instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Changed!


while (buckets[ptr].max_boundary > gauge && ptr >= 0) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At your option: consider a for loop:

for (size_t i = num_buckets - 1; i >= 0 && buckets[i].max_boundary > gauge; i--) {
  buckets[i].counter++;
}

Copy link
Author

@bkjg bkjg Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this for loop doesn't work and will call segmentation fault I think. That's because i is the size_t type and it imply that i will always be greater equal zero. If we subtract 1 from 0, then we will underflow, get the maximum value of size_t and then buckets[i].max_boundary will call seg fault

But I know what you mean, I will consider it. Thanks!

buckets[ptr].counter++;
ptr--;
}
}

void distribution_update(distribution_t *d, double gauge) {
if (d == NULL) {
errno = EINVAL;
return;
}

pthread_mutex_lock(&d->mutex);

bucket_update(d->buckets, d->num_buckets, gauge);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of exporting the bucket update to a separate function, great!


d->sum_gauges += gauge;
pthread_mutex_unlock(&d->mutex);
}

static double find_percentile(bucket_t *buckets, size_t num_buckets,
uint64_t quantity) {
size_t left = 0;
size_t right = num_buckets - 1;
size_t middle;

while (left < right) {
middle = (left + right) / 2;

if (buckets[middle].counter >= quantity) {
left = middle;
} else {
right = middle - 1;
}
}

return buckets[left].max_boundary;
}

double distribution_percentile(distribution_t *d, double percent) {
if (d == NULL || percent > 100.0) {
errno = EINVAL;
return NAN;
}

pthread_mutex_lock(&d->mutex);

uint64_t quantity =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to add a cast to uint64_t here, because implicitly assigning a double (right hand side) to a uint64_t can easily cause problems with some compilers, even if it compiles without problem on your machine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks!

(percent / 100.0) * d->buckets[d->num_buckets - 1].counter;

percent = find_percentile(d->buckets, d->num_buckets, quantity);

pthread_mutex_unlock(&d->mutex);
return percent;
}

double distribution_average(distribution_t *d) {
if (d == NULL) {
errno = EINVAL;
return NAN;
}

pthread_mutex_lock(&d->mutex);

double average =
d->sum_gauges / (double)d->buckets[d->num_buckets - 1].counter;

pthread_mutex_unlock(&d->mutex);

return average;
}

distribution_t *distribution_clone(distribution_t *d) {
if (d == NULL) {
errno = EINVAL;
return NULL;
}

distribution_t *distribution = calloc(1, sizeof(distribution_t));
octo marked this conversation as resolved.
Show resolved Hide resolved

if (distribution == NULL) {
return NULL;
}

pthread_mutex_lock(&d->mutex);

distribution->sum_gauges = d->sum_gauges;
distribution->num_buckets = d->num_buckets;

distribution->buckets = calloc(d->num_buckets, sizeof(bucket_t));

if (distribution->buckets == NULL) {
free(distribution);
pthread_mutex_unlock(&d->mutex);
return NULL;
}

memcpy(distribution->buckets, d->buckets, d->num_buckets * sizeof(bucket_t));

pthread_mutex_init(&distribution->mutex, NULL);

pthread_mutex_unlock(&d->mutex);

return distribution;
}

void distribution_destroy(distribution_t *d) {
if (d == NULL) {
return;
}

pthread_mutex_destroy(&d->mutex);
free(d->buckets);
free(d);
}
94 changes: 94 additions & 0 deletions src/daemon/distribution.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* collectd - src/daemon/distribution.h
* Copyright (C) 2020 Google LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* Authors:
* Barbara bkjg Kaczorowska <bkjg at google.com>
*/

#ifndef DISTRIBUTION_H
#define DISTRIBUTION_H

#include "collectd.h"

struct distribution_s;
typedef struct distribution_s distribution_t;

/* function that create new distribution structure and initialize buckets using
* linear function
* it will return null if any error occurred, for example - num_buckets is zero
* or OS couldn't allocate the memory - then errno will contain the error code
*/
distribution_t *distribution_new_linear(size_t num_buckets, double size);
/* function that create new distribution structure and initialize buckets using
* exponential function
* it will return null if any error occurred, for example - num_buckets is zero
* or OS couldn't allocate the memory - then errno will contain the error code
*/
distribution_t *distribution_new_exponential(size_t num_buckets,
double initial_size,
double factor);
/* function that create new distribution structure and initialize buckets using
* custom buckets sizes given by the user
* It will return null if any error occurred, for example - num_boundaries is
* less than zero or OS couldn't allocate the memory - then errno will contain
* the error code There is also one case when the function can return with some
* error - if the custom buckets boundaries aren't in the ascending order or
* some boundaries are less than zero, then the function will return null and
* will set errno to EINVAL
*/
distribution_t *distribution_new_custom(size_t num_boundaries,
double *custom_buckets_boundaries);

/* function for updating the buckets
* if the user will give the wrong argument, i.e. d will be null, then the
* function will return and the errno will be set to EINVAL*/
void distribution_update(distribution_t *d, double gauge);

/* function for getting the percentile
* if the user will give the wrong argument, i.e. d will be null or percent
* will be greater than 100.0, then the function will return NaN and the errno
* will be set to EINVAL*/
double distribution_percentile(distribution_t *d, double percent);

/* function that calculates average of gauges
* if the user will pass the wrong argument to the function, i.e. d will be
* null, then the function will return NaN and the errno will be set to EINVAL
*/
double distribution_average(distribution_t *d);

/* function that do the clone of distribution structure
* if the user will pass the wrong argument to this function, i.e. d will be
* null, then the function will return null and the errno will be set to EINVAL
* there is also the possibility that the user will pass the proper argument but
* the function will fail anyway, i.e. calloc will return null, then the
* function will return null and the errno will be set by calloc system call
*/
distribution_t *distribution_clone(distribution_t *d);

/* function that do clean up and free all the memory
* if the user will pass the null as an argument to this function, then the
* function
* will return without setting any errno like the OS do it when have to free a
* null pointer */
void distribution_destroy(distribution_t *d);

#endif // DISTRIBUTION_H