Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Gauge metric with Pandas support #800

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ dist
.coverage
.tox
.*cache
htmlcov
htmlcov
env39/
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,8 @@ for family in text_string_to_metric_families(u"my_gauge 1.0\n"):

* [Releases](https://github.com/prometheus/client_python/releases): The releases page shows the history of the project and acts as a changelog.
* [PyPI](https://pypi.python.org/pypi/prometheus_client)


```
python -m pytest -vv -s -k test_gauge_pandas .\tests\
```
3 changes: 2 additions & 1 deletion prometheus_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
write_to_textfile,
)
from .gc_collector import GC_COLLECTOR, GCCollector
from .metrics import Counter, Enum, Gauge, Histogram, Info, Summary
from .metrics import Counter, Enum, Gauge, Histogram, Info, PandasGauge, Summary
from .metrics_core import Metric
from .platform_collector import PLATFORM_COLLECTOR, PlatformCollector
from .process_collector import PROCESS_COLLECTOR, ProcessCollector
Expand All @@ -27,6 +27,7 @@
'Histogram',
'Info',
'Enum',
'PandasGauge',
'CONTENT_TYPE_LATEST',
'generate_latest',
'MetricsHandler',
Expand Down
48 changes: 28 additions & 20 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,29 +186,37 @@ def sample_line(line):
mtype = 'histogram'
elif mtype == 'unknown':
mtype = 'untyped'

output.append('# HELP {} {}\n'.format(
mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append(f'# TYPE {mname} {mtype}\n')

om_samples = {}
for s in metric.samples:
for suffix in ['_created', '_gsum', '_gcount']:
if s.name == metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
om_samples.setdefault(suffix, []).append(sample_line(s))
break
else:
output.append(sample_line(s))
# default encoder
if 'encoder' not in vars(metric) or ('encoder' in vars(metric) and metric.encoder != 'pandas'):
# normal calls
output.append('# HELP {} {}\n'.format(
mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append(f'# TYPE {mname} {mtype}\n')

om_samples = {}
for s in metric.samples:
for suffix in ['_created', '_gsum', '_gcount']:
if s.name == metric.name + suffix:
# OpenMetrics specific sample, put in a gauge at the end.
om_samples.setdefault(suffix, []).append(sample_line(s))
break
else:
output.append(sample_line(s))
for suffix, lines in sorted(om_samples.items()):
output.append('# HELP {}{} {}\n'.format(metric.name, suffix,
metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append(f'# TYPE {metric.name}{suffix} gauge\n')
output.extend(lines)
else:
# pandas encoder
output.append('# HELP {} {}\n'.format(
mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append(f'# TYPE {mname} {mtype}\n')
output.extend(metric[metric._tag].to_list())
except Exception as exception:
exception.args = (exception.args or ('',)) + (metric,)
raise

for suffix, lines in sorted(om_samples.items()):
output.append('# HELP {}{} {}\n'.format(metric.name, suffix,
metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
output.append(f'# TYPE {metric.name}{suffix} gauge\n')
output.extend(lines)

return ''.join(output).encode('utf-8')


Expand Down
158 changes: 157 additions & 1 deletion prometheus_client/metrics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from threading import Lock
import time
import types
from threading import Lock
from typing import (
Any, Callable, Dict, Iterable, Optional, Sequence, Type, TypeVar, Union,
)

import pandas as pd

from . import values # retain this import style for testability
from .context_managers import ExceptionCounter, InprogressTracker, Timer
from .metrics_core import (
Expand All @@ -15,7 +17,10 @@
from .samples import Exemplar, Sample
from .utils import floatToGoString, INF



T = TypeVar('T', bound='MetricWrapperBase')
P = TypeVar('P', bound='PandasGauge')
F = TypeVar("F", bound=Callable[..., Any])


Expand Down Expand Up @@ -714,3 +719,154 @@ def _child_samples(self) -> Iterable[Sample]:
for i, s
in enumerate(self._states)
]


class PandasGauge:
_encoder = 'pandas'
_type: Optional[str] = 'gauge'
_reserved_labelnames: Sequence[str] = ()

def _is_observable(self):
# Whether this metric is observable, i.e.
# * a metric without label names and values, or
# * the child of a labelled metric.
return not self._labelnames or (self._labelnames and self._labelvalues)

def _raise_if_not_observable(self):
# Functions that mutate the state of the metric, for example incrementing
# a counter, will fail if the metric is not observable, because only if a
# metric is observable will the value be initialized.
if not self._is_observable():
raise ValueError('%s metric is missing label values' % str(self._type))

def _is_parent(self):
return self._labelnames and not self._labelvalues

def describe(self):
return [self._metrics]

def collect(self):
return [self._metrics]

def __str__(self):
return f"{self._type}:{self._name}"

def __repr__(self):
print("repr")
metric_type = type(self)
return f"{metric_type.__module__}.{metric_type.__name__}({self._name})"

def generate_pandas_report(self):
def make_str(row):
return f"""{self._name}{{{','.join([ f'{col}="{row[col]}" ' for col in self._labelnames if col not in [self._value, self._tag]])}}} {row[self._value]} {chr(10)}"""
with self._lock:
self._metrics[self._tag] = self._metrics.apply(make_str, axis=1)
# self._metrics

def set_metric(self, df: pd.DataFrame) -> None:
with self._lock:
df.name = self._name
df.type = self._type
df.documentation = self._documentation
df.encoder = 'pandas'
self._metrics = df
self.generate_pandas_report()

def __init__(
self: P,
name: str,
documentation: str,
df: pd.DataFrame,
namespace: str = '',
subsystem: str = '',
unit: str = '',
columns: list = None,
registry: Optional[CollectorRegistry] = REGISTRY,
tag: str = 'report',
value: str = 'value'
) -> None:
"""
Esta classe parte do pressuporto que a metrica é trocada com mais eficiencia do que ficar alterando apenas 1 valor
o calculo pode ser feito em outro lugar e passar apenas a estrutura completo pronto em DataFrame
"""
if df is None:
raise ValueError("df must be set")

self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
if columns:
self._labelvalues = columns
else:
self._labelvalues = df.columns

self._labelnames = _validate_labelnames(self, self._labelvalues)
self._labelvalues = tuple(None or ())
self._kwargs: Dict[str, Any] = {}
self._documentation = documentation
self._unit = unit
self._tag = tag
self._value = value
df.name = self._name
df.type = self._type
df.documentation = documentation
df.encoder = self._encoder
df._tag = tag
if not METRIC_NAME_RE.match(self._name):
raise ValueError('Invalid metric name: ' + self._name)

if self._is_parent():
# Prepare the fields needed for child metrics.
self._lock = Lock()
self._metrics = df


if self._is_observable():
self._metric_init()

if not self._labelvalues:
# Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang.
if registry:
registry.register(self)
self.generate_pandas_report()

def remove(self, *labelvalues):
if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)

"""Remove the given labelset from the metric."""
if len(labelvalues) != len(self._labelnames):
raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues))
labelvalues = tuple(str(l) for l in labelvalues)
with self._lock:
del self._metrics[labelvalues]

def clear(self) -> None:
"""Remove all labelsets from the metric"""
with self._lock:
self._metrics = {}

# def _samples(self) -> Iterable[Sample]:
# if self._is_parent():
# return self._multi_samples()
# else:
# return self._child_samples()

# def _multi_samples(self) -> Iterable[Sample]:
# if 'pandas' not in vars(metrics._encoder):
# with self._lock:
# metrics = self._metrics.copy()
# for labels, metric in metrics.items():
# series_labels = list(zip(self._labelnames, labels))
# for suffix, sample_labels, value, timestamp, exemplar in metric._samples():
# yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar)

def _child_samples(self) -> Iterable[Sample]: # pragma: no cover
raise NotImplementedError('_child_samples() must be implemented by %r' % self)

def _metric_init(self): # pragma: no cover
"""
Initialize the metric object as a child, i.e. when it has labels (if any) set.

This is factored as a separate function to allow for deferred initialization.
# raise NotImplementedError('_metric_init() must be implemented by %r' % self)
"""
pass
1 change: 1 addition & 0 deletions prometheus_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def collect(self):
if ti:
yield ti
for collector in collectors:

yield from collector.collect()

def restricted_registry(self, names):
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest
pandas
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
extras_require={
'twisted': ['twisted'],
'pandas': ['pandas'],
},
test_suite="tests",
python_requires=">=3.6",
Expand Down
87 changes: 84 additions & 3 deletions tests/test_exposition.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import pandas as pd
import pytest
import threading
import time
import unittest

import pytest

from prometheus_client import (
CollectorRegistry, CONTENT_TYPE_LATEST, core, Counter, delete_from_gateway,
Enum, Gauge, generate_latest, Histogram, Info, instance_ip_grouping_key,
Metric, push_to_gateway, pushadd_to_gateway, Summary,
Metric, PandasGauge, push_to_gateway, pushadd_to_gateway, Summary
)
from prometheus_client.core import GaugeHistogramMetricFamily, Timestamp
from prometheus_client.exposition import (
Expand Down Expand Up @@ -192,6 +192,87 @@ def collect(self):
ts{foo="f"} 0.0 123000
""", generate_latest(self.registry))

def test_gauge_pandas(self):
"""
2 possiveis chamadas
usa apenas as colunas expostas
PandasGauge('report_pandas', 'metric description', columns=['columnn01', 'column02'], registry=self.registry)
ou
usará todos as colunas
PandasGauge('report_pandas', 'metric description', df=df, registry=self.registry)
ou
PandasGauge('report_pandas', 'metric description', df=df, columns=['columnn01', 'column02'], registry=self.registry)
"""
df = pd.DataFrame({'a': [1.1, 2.2, 3.3, 4.4], 'b': [5.1, 6.2, 7.3, 8.4], 'value': [1, 2, 3, 4]})
df2 = pd.DataFrame({'c': [1.1, 2.2, 3.3, 4.4], 'd': [5.1, 6.2, 7.3, 8.4], 'value': [5, 6, 7, 8]})
PandasGauge('report_pandas', 'metric description', df=df, columns=['a', 'b', 'value'], registry=self.registry)
g2 = PandasGauge('report_panda2s', 'metric description2', df=df2, registry=self.registry)

self.assertEqual(
b'# HELP report_pandas metric description\n'
b'# TYPE report_pandas gauge\n'
b'report_pandas{a="1.1" ,b="5.1" } 1.0 \n'
b'report_pandas{a="2.2" ,b="6.2" } 2.0 \n'
b'report_pandas{a="3.3" ,b="7.3" } 3.0 \n'
b'report_pandas{a="4.4" ,b="8.4" } 4.0 \n'
b'# HELP report_panda2s metric description2\n'
b'# TYPE report_panda2s gauge\n'
b'report_panda2s{c="1.1" ,d="5.1" } 5.0 \n'
b'report_panda2s{c="2.2" ,d="6.2" } 6.0 \n'
b'report_panda2s{c="3.3" ,d="7.3" } 7.0 \n'
b'report_panda2s{c="4.4" ,d="8.4" } 8.0 \n',
generate_latest(self.registry)
)

g2.set_metric(df2)
self.assertEqual(
b'# HELP report_pandas metric description\n'
b'# TYPE report_pandas gauge\n'
b'report_pandas{a="1.1" ,b="5.1" } 1.0 \n'
b'report_pandas{a="2.2" ,b="6.2" } 2.0 \n'
b'report_pandas{a="3.3" ,b="7.3" } 3.0 \n'
b'report_pandas{a="4.4" ,b="8.4" } 4.0 \n'
b'# HELP report_panda2s metric description2\n'
b'# TYPE report_panda2s gauge\n'
b'report_panda2s{c="1.1" ,d="5.1" } 5 \n'
b'report_panda2s{c="2.2" ,d="6.2" } 6 \n'
b'report_panda2s{c="3.3" ,d="7.3" } 7 \n'
b'report_panda2s{c="4.4" ,d="8.4" } 8 \n',
generate_latest(self.registry)
)

def test_gauge_pandas_columns(self):
"""
2 possiveis chamadas
usa apenas as colunas expostas
PandasGauge('report_pandas', 'metric description', columns=['columnn01', 'column02'], registry=self.registry)
ou
usará todos as colunas
PandasGauge('report_pandas', 'metric description', df=df, registry=self.registry)
ou
PandasGauge('report_pandas', 'metric description', df=df, columns=['columnn01', 'column02'], registry=self.registry)
"""
df = pd.DataFrame({'a': [1.1, 2.2, 3.3, 4.4], 'b': [5.1, 6.2, 7.3, 8.4], 'value': [1, 2, 3, 4]})
df2 = pd.DataFrame({'c': [1.1, 2.2, 3.3, 4.4], 'd': [5.1, 6.2, 7.3, 8.4], 'result': [5, 6, 7, 8]})
PandasGauge('report_pandas', 'metric description', df=df, columns=['a', 'value'], registry=self.registry)
g2 = PandasGauge('report_panda2s', 'metric description2', df=df2, columns=['d', 'result'], value='result', registry=self.registry)

self.assertEqual(
b'# HELP report_pandas metric description\n'
b'# TYPE report_pandas gauge\n'
b'report_pandas{a="1.1" } 1.0 \n'
b'report_pandas{a="2.2" } 2.0 \n'
b'report_pandas{a="3.3" } 3.0 \n'
b'report_pandas{a="4.4" } 4.0 \n'
b'# HELP report_panda2s metric description2\n'
b'# TYPE report_panda2s gauge\n'
b'report_panda2s{d="5.1" } 5.0 \n'
b'report_panda2s{d="6.2" } 6.0 \n'
b'report_panda2s{d="7.3" } 7.0 \n'
b'report_panda2s{d="8.4" } 8.0 \n',
generate_latest(self.registry)
)


class TestPushGateway(unittest.TestCase):
def setUp(self):
Expand Down
Loading