Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Update the prometheus dependency of Loki and Promtail #12245

Merged
merged 4 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 2 additions & 1 deletion clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/dskit/log"
"github.com/grafana/dskit/tracing"
"github.com/prometheus/client_golang/prometheus"
collectors_version "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"

"github.com/grafana/loki/clients/pkg/logentry/stages"
Expand All @@ -32,7 +33,7 @@ import (
)

func init() {
prometheus.MustRegister(version.NewCollector("promtail"))
prometheus.MustRegister(collectors_version.NewCollector("promtail"))
}

var mtx sync.Mutex
Expand Down
71 changes: 33 additions & 38 deletions clients/pkg/promtail/discovery/consulagent/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package consulagent
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -62,26 +63,6 @@ const (
)

var (
rpcFailuresCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_failures_total",
Help: "The number of Consul Agent RPC call failures.",
})
rpcDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_duration_seconds",
Help: "The duration of a Consul Agent RPC call in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"endpoint", "call"},
)

// Initialize metric vectors.
servicesRPCDuration = rpcDuration.WithLabelValues("agent", "services")
serviceRPCDuration = rpcDuration.WithLabelValues("agent", "service")

// DefaultSDConfig is the default Consul SD configuration.
DefaultSDConfig = SDConfig{
TagSeparator: ",",
Expand All @@ -94,8 +75,6 @@ var (

func init() {
discovery.RegisterConfig(&SDConfig{})
prometheus.MustRegister(rpcFailuresCount)
prometheus.MustRegister(rpcDuration)
}

// SDConfig is the configuration for Consul service discovery.
Expand Down Expand Up @@ -129,12 +108,17 @@ type SDConfig struct {
TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"`
}

// NewDiscovererMetrics implements discovery.Config.
func (c *SDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return newDiscovererMetrics(reg, rmi)
}

// Name returns the name of the Config.
func (*SDConfig) Name() string { return "consulagent" }

// NewDiscoverer returns a Discoverer for the Config.
func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewDiscovery(c, opts.Logger)
return NewDiscovery(c, opts.Logger, opts.Metrics)
}

// SetDirectory joins any relative file paths with dir.
Expand Down Expand Up @@ -169,10 +153,16 @@ type Discovery struct {
refreshInterval time.Duration
finalizer func()
logger log.Logger
metrics *consulMetrics
}

// NewDiscovery returns a new Discovery for the given config.
func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
m, ok := metrics.(*consulMetrics)
if !ok {
return nil, fmt.Errorf("invalid discovery metrics type")
}

if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -220,6 +210,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
clientDatacenter: conf.Datacenter,
finalizer: transport.CloseIdleConnections,
logger: logger,
metrics: m,
}
return cd, nil
}
Expand Down Expand Up @@ -275,7 +266,7 @@ func (d *Discovery) getDatacenter() error {
info, err := d.client.Agent().Self()
if err != nil {
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
rpcFailuresCount.Inc()
d.metrics.rpcFailuresCount.Inc()
return err
}

Expand Down Expand Up @@ -356,7 +347,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
t0 := time.Now()
srvs, err := agent.Services()
elapsed := time.Since(t0)
servicesRPCDuration.Observe(elapsed.Seconds())
d.metrics.servicesRPCDuration.Observe(elapsed.Seconds())

// Check the context before in order to exit early.
select {
Expand All @@ -367,7 +358,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.

if err != nil {
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
rpcFailuresCount.Inc()
d.metrics.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
}
Expand Down Expand Up @@ -423,13 +414,15 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.

// consulService contains data belonging to the same service.
type consulService struct {
name string
tags []string
labels model.LabelSet
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
name string
tags []string
labels model.LabelSet
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
rpcFailuresCount prometheus.Counter
serviceRPCDuration prometheus.Observer
}

// Start watching a service.
Expand All @@ -443,8 +436,10 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(d.clientDatacenter),
},
tagSeparator: d.tagSeparator,
logger: d.logger,
tagSeparator: d.tagSeparator,
logger: d.logger,
rpcFailuresCount: d.metrics.rpcFailuresCount,
serviceRPCDuration: d.metrics.serviceRPCDuration,
}

go func() {
Expand Down Expand Up @@ -474,7 +469,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
t0 := time.Now()
aggregatedStatus, serviceChecks, err := agent.AgentHealthServiceByName(srv.name)
elapsed := time.Since(t0)
serviceRPCDuration.Observe(elapsed.Seconds())
srv.serviceRPCDuration.Observe(elapsed.Seconds())

// Check the context before in order to exit early.
select {
Expand All @@ -486,7 +481,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr

if err != nil {
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err)
rpcFailuresCount.Inc()
srv.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
}
Expand Down
35 changes: 30 additions & 5 deletions clients/pkg/promtail/discovery/consulagent/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

Expand All @@ -34,10 +36,24 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

// TODO: Add ability to unregister metrics?
func NewTestMetrics(t *testing.T, conf discovery.Config, reg prometheus.Registerer) discovery.DiscovererMetrics {
refreshMetrics := discovery.NewRefreshMetrics(reg)
require.NoError(t, refreshMetrics.Register())

metrics := conf.NewDiscovererMetrics(prometheus.NewRegistry(), refreshMetrics)
require.NoError(t, metrics.Register())

return metrics
}

func TestConfiguredService(t *testing.T) {
conf := &SDConfig{
Services: []string{"configuredServiceName"}}
consulDiscovery, err := NewDiscovery(conf, nil)

metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())

consulDiscovery, err := NewDiscovery(conf, nil, metrics)

if err != nil {
t.Errorf("Unexpected error when initializing discovery %v", err)
Expand All @@ -55,7 +71,10 @@ func TestConfiguredServiceWithTag(t *testing.T) {
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http"},
}
consulDiscovery, err := NewDiscovery(conf, nil)

metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())

consulDiscovery, err := NewDiscovery(conf, nil, metrics)

if err != nil {
t.Errorf("Unexpected error when initializing discovery %v", err)
Expand Down Expand Up @@ -151,7 +170,9 @@ func TestConfiguredServiceWithTags(t *testing.T) {
}

for _, tc := range cases {
consulDiscovery, err := NewDiscovery(tc.conf, nil)
metrics := NewTestMetrics(t, tc.conf, prometheus.NewRegistry())

consulDiscovery, err := NewDiscovery(tc.conf, nil, metrics)

if err != nil {
t.Errorf("Unexpected error when initializing discovery %v", err)
Expand All @@ -166,7 +187,10 @@ func TestConfiguredServiceWithTags(t *testing.T) {

func TestNonConfiguredService(t *testing.T) {
conf := &SDConfig{}
consulDiscovery, err := NewDiscovery(conf, nil)

metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())

consulDiscovery, err := NewDiscovery(conf, nil, metrics)

if err != nil {
t.Errorf("Unexpected error when initializing discovery %v", err)
Expand Down Expand Up @@ -341,7 +365,8 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) {

func newDiscovery(t *testing.T, config *SDConfig) *Discovery {
logger := log.NewNopLogger()
d, err := NewDiscovery(config, logger)
metrics := NewTestMetrics(t, config, prometheus.NewRegistry())
d, err := NewDiscovery(config, logger, metrics)
require.NoError(t, err)
return d
}
Expand Down
65 changes: 65 additions & 0 deletions clients/pkg/promtail/discovery/consulagent/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// This code was adapted from the consul service discovery
// package in prometheus: https://github.com/prometheus/prometheus/blob/main/discovery/consul/metrics.go
// which is copyrighted: 2015 The Prometheus Authors
// and licensed under the Apache License, Version 2.0 (the "License");

package consulagent

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/prometheus/discovery"
)

var _ discovery.DiscovererMetrics = (*consulMetrics)(nil)

type consulMetrics struct {
rpcFailuresCount prometheus.Counter
rpcDuration *prometheus.SummaryVec

servicesRPCDuration prometheus.Observer
serviceRPCDuration prometheus.Observer

metricRegisterer discovery.MetricRegisterer
}

func newDiscovererMetrics(reg prometheus.Registerer, _ discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
m := &consulMetrics{
rpcFailuresCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_failures_total",
Help: "The number of Consul Agent RPC call failures.",
}),
rpcDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_duration_seconds",
Help: "The duration of a Consul Agent RPC call in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"endpoint", "call"},
),
}

m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
m.rpcFailuresCount,
m.rpcDuration,
})

// Initialize metric vectors.
m.servicesRPCDuration = m.rpcDuration.WithLabelValues("agent", "services")
m.serviceRPCDuration = m.rpcDuration.WithLabelValues("agent", "service")

return m
}

// Register implements discovery.DiscovererMetrics.
func (m *consulMetrics) Register() error {
return m.metricRegisterer.RegisterMetrics()
}

// Unregister implements discovery.DiscovererMetrics.
func (m *consulMetrics) Unregister() {
m.metricRegisterer.UnregisterMetrics()
}
2 changes: 1 addition & 1 deletion clients/pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func getPromMetrics(t *testing.T, httpListenAddr net.Addr) ([]byte, string) {
func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 {
rb := map[string]float64{}

pr, err := textparse.New(bytes, contentType, false)
pr, err := textparse.New(bytes, contentType, false, nil)
require.NoError(t, err)
for {
et, err := pr.Next()
Expand Down
4 changes: 2 additions & 2 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync"
"time"

docker_types "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-kit/log"
Expand Down Expand Up @@ -88,7 +88,7 @@ func (t *Target) processLoop(ctx context.Context) {
t.wg.Add(1)
defer t.wg.Done()

opts := docker_types.ContainerLogsOptions{
opts := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Expand Down
23 changes: 17 additions & 6 deletions clients/pkg/promtail/targets/docker/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,25 @@ func NewTargetManager(
pushClient api.EntryHandler,
scrapeConfigs []scrapeconfig.Config,
) (*TargetManager, error) {
noopRegistry := util.NoopRegistry{}
noopSdMetrics, err := discovery.CreateAndRegisterSDMetrics(noopRegistry)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
tm := &TargetManager{
metrics: metrics,
logger: logger,
cancel: cancel,
done: make(chan struct{}),
positions: positions,
manager: discovery.NewManager(ctx, log.With(logger, "component", "docker_discovery")),
metrics: metrics,
logger: logger,
cancel: cancel,
done: make(chan struct{}),
positions: positions,
manager: discovery.NewManager(
ctx,
log.With(logger, "component", "docker_discovery"),
noopRegistry,
noopSdMetrics,
),
pushClient: pushClient,
groups: make(map[string]*targetGroup),
}
Expand Down
Loading
Loading