Skip to content

Commit

Permalink
Merge pull request #713 from zalando-incubator/collector-context
Browse files Browse the repository at this point in the history
Introduce context for collector interface
  • Loading branch information
katyanna authored May 21, 2024
2 parents ca85920 + 5a54378 commit f406e86
Show file tree
Hide file tree
Showing 23 changed files with 130 additions and 113 deletions.
10 changes: 5 additions & 5 deletions pkg/collector/aws_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewAWSCollectorPlugin(configs map[string]aws.Config) *AWSCollectorPlugin {
}

// NewCollector initializes a new skipper collector from the specified HPA.
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewAWSSQSCollector(c.configs, hpa, config, interval)
func (c *AWSCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewAWSSQSCollector(ctx, c.configs, hpa, config, interval)
}

type sqsiface interface {
Expand All @@ -50,7 +50,7 @@ type AWSSQSCollector struct {
metricType autoscalingv2.MetricSourceType
}

func NewAWSSQSCollector(configs map[string]aws.Config, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
func NewAWSSQSCollector(ctx context.Context, configs map[string]aws.Config, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for queue is not specified")
}
Expand Down Expand Up @@ -90,13 +90,13 @@ func NewAWSSQSCollector(configs map[string]aws.Config, hpa *autoscalingv2.Horizo
}, nil
}

func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
func (c *AWSSQSCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
params := &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(c.queueURL),
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameApproximateNumberOfMessages},
}

resp, err := c.sqs.GetQueueAttributes(context.TODO(), params)
resp, err := c.sqs.GetQueueAttributes(ctx, params)
if err != nil {
return nil, err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -50,7 +51,7 @@ func NewCollectorFactory() *CollectorFactory {
}

type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}

type PluginNotFoundError struct {
Expand Down Expand Up @@ -120,38 +121,38 @@ func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin Co
}
}

func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (c *CollectorFactory) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Type {
case autoscalingv2.PodsMetricSourceType:
// first try to find a plugin by format
if plugin, ok := c.podsPlugins.Named[config.CollectorType]; ok {
return plugin.NewCollector(hpa, config, interval)
return plugin.NewCollector(ctx, hpa, config, interval)
}

// else try to use the default plugin if set
if c.podsPlugins.Any != nil {
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
return c.podsPlugins.Any.NewCollector(ctx, hpa, config, interval)
}
case autoscalingv2.ObjectMetricSourceType:
// first try to find a plugin by kind
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
if plugin, ok := kinds.Named[config.CollectorType]; ok {
return plugin.NewCollector(hpa, config, interval)
return plugin.NewCollector(ctx, hpa, config, interval)
}

if kinds.Any != nil {
return kinds.Any.NewCollector(hpa, config, interval)
return kinds.Any.NewCollector(ctx, hpa, config, interval)
}
break
}

// else try to find a default plugin for this kind
if plugin, ok := c.objectPlugins.Any.Named[config.CollectorType]; ok {
return plugin.NewCollector(hpa, config, interval)
return plugin.NewCollector(ctx, hpa, config, interval)
}

if c.objectPlugins.Any.Any != nil {
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
return c.objectPlugins.Any.Any.NewCollector(ctx, hpa, config, interval)
}
case autoscalingv2.ExternalMetricSourceType:
// First type to get metric type from the `type` label,
Expand All @@ -169,7 +170,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
}

if plugin, ok := c.externalPlugins[pluginKey]; ok {
return plugin.NewCollector(hpa, config, interval)
return plugin.NewCollector(ctx, hpa, config, interval)
}
}

Expand All @@ -189,7 +190,7 @@ type CollectedMetric struct {
}

type Collector interface {
GetMetrics() ([]CollectedMetric, error)
GetMetrics(ctx context.Context) ([]CollectedMetric, error)
Interval() time.Duration
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/collector/collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"testing"
"time"

Expand All @@ -13,15 +14,15 @@ type mockCollectorPlugin struct {
Name string
}

func (c *mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (c *mockCollectorPlugin) NewCollector(_ context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return &mockCollector{Name: c.Name}, nil
}

type mockCollector struct {
Name string
}

func (c *mockCollector) GetMetrics() ([]CollectedMetric, error) {
func (c *mockCollector) GetMetrics(_ context.Context) ([]CollectedMetric, error) {
return nil, nil
}

Expand Down Expand Up @@ -114,7 +115,7 @@ func TestNewCollector(t *testing.T) {
require.NoError(t, err)
require.Len(t, configs, 1)

collector, err := collectorFactory.NewCollector(tc.hpa, configs[0], 0)
collector, err := collectorFactory.NewCollector(context.Background(), tc.hpa, configs[0], 0)
if tc.expectedCollector == "" {
require.Error(t, err)
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/collector/external_rps_collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"regexp"
"strconv"
Expand Down Expand Up @@ -48,6 +49,7 @@ func NewExternalRPSCollectorPlugin(

// NewCollector initializes a new skipper collector from the specified HPA.
func (p *ExternalRPSCollectorPlugin) NewCollector(
ctx context.Context,
hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig,
interval time.Duration,
Expand Down Expand Up @@ -95,7 +97,7 @@ func (p *ExternalRPSCollectorPlugin) NewCollector(
),
}

c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval)
c, err := p.promPlugin.NewCollector(ctx, hpa, &confCopy, interval)
if err != nil {
return nil, err
}
Expand All @@ -107,8 +109,8 @@ func (p *ExternalRPSCollectorPlugin) NewCollector(
}

// GetMetrics gets hostname metrics from Prometheus
func (c *ExternalRPSCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.promCollector.GetMetrics()
func (c *ExternalRPSCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
v, err := c.promCollector.GetMetrics(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/collector/external_rps_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"regexp"
"testing"
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestExternalRPSPluginNewCollector(tt *testing.T) {
} {
tt.Run(testcase.msg, func(t *testing.T) {
c, err := plugin.NewCollector(
context.Background(),
&autoscalingv2.HorizontalPodAutoscaler{},
testcase.config,
interval,
Expand Down Expand Up @@ -156,7 +158,7 @@ func TestExternalRPSCollectorGetMetrics(tt *testing.T) {
tt.Run(testcase.msg, func(t *testing.T) {
fake := makeCollectorWithStub(testcase.stub)
c := &ExternalRPSCollector{promCollector: fake}
m, err := c.GetMetrics()
m, err := c.GetMetrics(context.Background())

if testcase.shouldWork {
require.Nil(t, err)
Expand All @@ -182,6 +184,7 @@ func TestExternalRPSCollectorInterval(t *testing.T) {
pattern: pattern,
}
c, err := plugin.NewCollector(
context.Background(),
&autoscalingv2.HorizontalPodAutoscaler{},
&MetricConfig{Config: map[string]string{"hostnames": "foo.bar.baz"}},
interval,
Expand Down Expand Up @@ -227,7 +230,7 @@ func TestExternalRPSCollectorAndCollectorFabricInteraction(t *testing.T) {
require.NoError(t, err)
require.Len(t, conf, 1)

c, err := factory.NewCollector(hpa, conf[0], 0)
c, err := factory.NewCollector(context.Background(), hpa, conf[0], 0)

require.NoError(t, err)
_, ok := c.(*ExternalRPSCollector)
Expand Down Expand Up @@ -288,9 +291,9 @@ func TestExternalRPSPrometheusCollectorInteraction(t *testing.T) {
require.Len(t, conf, 2)

collectors := make(map[string]Collector)
collectors["hostname"], err = factory.NewCollector(hpa, conf[0], 0)
collectors["hostname"], err = factory.NewCollector(context.Background(), hpa, conf[0], 0)
require.NoError(t, err)
collectors["prom"], err = factory.NewCollector(hpa, conf[1], 0)
collectors["prom"], err = factory.NewCollector(context.Background(), hpa, conf[1], 0)
require.NoError(t, err)

prom, ok := collectors["prom"].(*PrometheusCollector)
Expand Down
4 changes: 3 additions & 1 deletion pkg/collector/fake_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"time"

"golang.org/x/net/context"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/custom_metrics"
Expand All @@ -19,7 +20,7 @@ type FakeCollector struct {
stub func() ([]CollectedMetric, error)
}

func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
func (c *FakeCollector) GetMetrics(_ context.Context) ([]CollectedMetric, error) {
if c.stub != nil {
v, err := c.stub()
return v, err
Expand All @@ -33,6 +34,7 @@ func (FakeCollector) Interval() time.Duration {
}

func (p *FakeCollectorPlugin) NewCollector(
_ context.Context,
hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig,
interval time.Duration,
Expand Down
5 changes: 3 additions & 2 deletions pkg/collector/http_collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"net/url"
"time"
Expand All @@ -26,7 +27,7 @@ func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
return &HTTPCollectorPlugin{}, nil
}

func (p *HTTPCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (p *HTTPCollectorPlugin) NewCollector(_ context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
collector := &HTTPCollector{
namespace: hpa.Namespace,
}
Expand Down Expand Up @@ -78,7 +79,7 @@ type HTTPCollector struct {
metric autoscalingv2.MetricIdentifier
}

func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
func (c *HTTPCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions pkg/collector/http_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -66,9 +67,9 @@ func TestHTTPCollector(t *testing.T) {
Namespace: "default",
},
}
collector, err := plugin.NewCollector(hpa, testConfig, testInterval)
collector, err := plugin.NewCollector(context.Background(), hpa, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.NotNil(t, metrics)
require.Len(t, metrics, 1)
Expand Down
14 changes: 7 additions & 7 deletions pkg/collector/influxdb_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org
}, nil
}

func (p *InfluxDBCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(hpa, p.address, p.token, p.org, config, interval)
func (p *InfluxDBCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(ctx, hpa, p.address, p.token, p.org, config, interval)
}

type InfluxDBCollector struct {
Expand All @@ -55,7 +55,7 @@ type InfluxDBCollector struct {
namespace string
}

func NewInfluxDBCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
func NewInfluxDBCollector(_ context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
Expand Down Expand Up @@ -107,9 +107,9 @@ type queryResult struct {
}

// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
func (c *InfluxDBCollector) getValue(ctx context.Context) (resource.Quantity, error) {
queryAPI := c.influxDBClient.QueryAPI(c.org)
res, err := queryAPI.Query(context.Background(), c.query)
res, err := queryAPI.Query(ctx, c.query)
if err != nil {
return resource.Quantity{}, err
}
Expand All @@ -125,8 +125,8 @@ func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
return resource.Quantity{}, fmt.Errorf("empty result returned")
}

func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
func (c *InfluxDBCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, error) {
v, err := c.getValue(ctx)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/collector/influxdb_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -38,7 +39,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector(hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
c, err := NewInfluxDBCollector(context.Background(), hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector(hpa, "http://localhost:8888", "secret", "deadbeef", m, time.Second)
c, err := NewInfluxDBCollector(context.Background(), hpa, "http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
CollectorType: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector(hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
_, err := NewInfluxDBCollector(context.Background(), hpa, "http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
Expand Down
Loading

0 comments on commit f406e86

Please sign in to comment.