diff --git a/README.md b/README.md index 3f3552a4..be7043cf 100644 --- a/README.md +++ b/README.md @@ -702,6 +702,52 @@ you need to define a `key` or other `tag` with a "star" query syntax like metric label definitions. If both annotations and corresponding label is defined, then the annotation takes precedence. + +## Nakadi collector + +The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/) +`consumer_lag_seconds` or `unconsumed_events`. + +```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: myapp-hpa + annotations: + # metric-config.../ + metric-config.external.my-nakadi-consumer.nakadi/interval: "60s" # optional +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: custom-metrics-consumer + minReplicas: 0 + maxReplicas: 8 # should match number of partitions for the event type + metrics: + - type: External + external: + metric: + name: my-nakadi-consumer + selector: + matchLabels: + type: nakadi + subscription-id: "708095f6-cece-4d02-840e-ee488d710b29" + metric-type: "consumer-lag-seconds|unconsumed-events" + target: + # value is compatible with the consumer-lag-seconds metric type. + # It describes the amount of consumer lag in seconds before scaling + # additionally up. + # if an event-type has multiple partitions the value of + # consumer-lag-seconds is the max of all the partitions. + value: "600" # 10m + # averageValue is compatible with unconsumed-events metric type. + # This means for every 30 unconsumed events a pod is scaled up. + # unconsumed-events is the sum of of unconsumed_events over all + # partitions. + averageValue: "30" + type: AverageValue +``` + ## HTTP Collector The http collector allows collecting metrics from an external endpoint specified in the HPA. diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go new file mode 100644 index 00000000..4b9a1316 --- /dev/null +++ b/pkg/collector/nakadi_collector.go @@ -0,0 +1,120 @@ +package collector + +import ( + "context" + "fmt" + "time" + + "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +const ( + // NakadiMetricType defines the metric type for metrics based on Nakadi + // subscriptions. + NakadiMetricType = "nakadi" + nakadiSubscriptionIDKey = "subscription-id" + nakadiMetricTypeKey = "metric-type" + nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds" + nakadiMetricTypeUnconsumedEvents = "unconsumed-events" +) + +// NakadiCollectorPlugin defines a plugin for creating collectors that can get +// unconsumed events from Nakadi. +type NakadiCollectorPlugin struct { + nakadi nakadi.Nakadi +} + +// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin. +func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) { + return &NakadiCollectorPlugin{ + nakadi: nakadi, + }, nil +} + +// NewCollector initializes a new Nakadi collector from the specified HPA. +func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { + return NewNakadiCollector(c.nakadi, hpa, config, interval) +} + +// NakadiCollector defines a collector that is able to collect metrics from +// Nakadi. +type NakadiCollector struct { + nakadi nakadi.Nakadi + interval time.Duration + subscriptionID string + nakadiMetricType string + metric autoscalingv2.MetricIdentifier + metricType autoscalingv2.MetricSourceType + namespace string +} + +// NewNakadiCollector initializes a new NakadiCollector. +func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { + if config.Metric.Selector == nil { + return nil, fmt.Errorf("selector for nakadi is not specified") + } + + subscriptionID, ok := config.Config[nakadiSubscriptionIDKey] + if !ok { + return nil, fmt.Errorf("subscription-id not specified on metric") + } + + metricType, ok := config.Config[nakadiMetricTypeKey] + if !ok { + return nil, fmt.Errorf("metric-type not specified on metric") + } + + if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents { + return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType) + } + + return &NakadiCollector{ + nakadi: nakadi, + interval: interval, + subscriptionID: subscriptionID, + nakadiMetricType: metricType, + metric: config.Metric, + metricType: config.Type, + namespace: hpa.Namespace, + }, nil +} + +// GetMetrics returns a list of collected metrics for the Nakadi subscription ID. +func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) { + var value int64 + var err error + switch c.nakadiMetricType { + case nakadiMetricTypeConsumerLagSeconds: + value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID) + if err != nil { + return nil, err + } + case nakadiMetricTypeUnconsumedEvents: + value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID) + if err != nil { + return nil, err + } + } + + metricValue := CollectedMetric{ + Namespace: c.namespace, + Type: c.metricType, + External: external_metrics.ExternalMetricValue{ + MetricName: c.metric.Name, + MetricLabels: c.metric.Selector.MatchLabels, + Timestamp: metav1.Time{Time: time.Now().UTC()}, + Value: *resource.NewQuantity(value, resource.DecimalSI), + }, + } + + return []CollectedMetric{metricValue}, nil +} + +// Interval returns the interval at which the collector should run. +func (c *NakadiCollector) Interval() time.Duration { + return c.interval +} diff --git a/pkg/nakadi/nakadi.go b/pkg/nakadi/nakadi.go new file mode 100644 index 00000000..094b049b --- /dev/null +++ b/pkg/nakadi/nakadi.go @@ -0,0 +1,131 @@ +package nakadi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" +) + +// Nakadi defines an interface for talking to the Nakadi API. +type Nakadi interface { + ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) + UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) +} + +// Client defines client for interfacing with the Nakadi API. +type Client struct { + nakadiEndpoint string + http *http.Client +} + +// NewNakadiClient initializes a new Nakadi Client. +func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client { + return &Client{ + nakadiEndpoint: nakadiEndpoint, + http: client, + } +} + +func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) { + stats, err := c.stats(ctx, subscriptionID) + if err != nil { + return 0, err + } + + var maxConsumerLagSeconds int64 + for _, eventType := range stats { + for _, partition := range eventType.Partitions { + if partition.ConsumerLagSeconds > maxConsumerLagSeconds { + maxConsumerLagSeconds = partition.ConsumerLagSeconds + } + } + } + + return maxConsumerLagSeconds, nil +} + +func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) { + stats, err := c.stats(ctx, subscriptionID) + if err != nil { + return 0, err + } + + var unconsumedEvents int64 + for _, eventType := range stats { + for _, partition := range eventType.Partitions { + unconsumedEvents += partition.UnconsumedEvents + } + } + + return unconsumedEvents, nil +} + +type statsResp struct { + Items []statsEventType `json:"items"` +} + +type statsEventType struct { + EventType string `json:"event_type"` + Partitions []statsPartition `json:"partitions"` +} + +type statsPartition struct { + Partiton string `json:"partition"` + State string `json:"state"` + UnconsumedEvents int64 `json:"unconsumed_events"` + ConsumerLagSeconds int64 `json:"consumer_lag_seconds"` + StreamID string `json:"stream_id"` + AssignmentType string `json:"assignment_type"` +} + +// stats returns the Nakadi stats for a given subscription ID. +// +// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get +func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) { + endpoint, err := url.Parse(c.nakadiEndpoint) + if err != nil { + return nil, err + } + + endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) + + q := endpoint.Query() + q.Set("show_time_lag", "true") + endpoint.RawQuery = q.Encode() + + req, err := http.NewRequest(http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, err + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var result statsResp + err = json.Unmarshal(d, &result) + if err != nil { + return nil, err + } + + if len(result.Items) == 0 { + return nil, errors.New("expected at least 1 event-type, 0 returned") + } + + return result.Items, nil +} diff --git a/pkg/nakadi/nakadi_test.go b/pkg/nakadi/nakadi_test.go new file mode 100644 index 00000000..2a14e0a8 --- /dev/null +++ b/pkg/nakadi/nakadi_test.go @@ -0,0 +1,141 @@ +package nakadi + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQuery(tt *testing.T) { + client := &http.Client{} + for _, ti := range []struct { + msg string + status int + responseBody string + err error + unconsumedEvents int64 + consumerLagSeconds int64 + }{ + { + msg: "test getting a single event-type", + status: http.StatusOK, + responseBody: `{ + "items": [ + { + "event_type": "example-event", + "partitions": [ + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 4, + "consumer_lag_seconds": 2, + "stream_id": "example-id", + "assignment_type": "auto" + }, + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 5, + "consumer_lag_seconds": 1, + "stream_id": "example-id", + "assignment_type": "auto" + } + ] + } + ] + }`, + unconsumedEvents: 9, + consumerLagSeconds: 2, + }, + { + msg: "test getting multiple event-types", + status: http.StatusOK, + responseBody: `{ + "items": [ + { + "event_type": "example-event", + "partitions": [ + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 4, + "consumer_lag_seconds": 2, + "stream_id": "example-id", + "assignment_type": "auto" + }, + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 5, + "consumer_lag_seconds": 1, + "stream_id": "example-id", + "assignment_type": "auto" + } + ] + }, + { + "event_type": "example-event-2", + "partitions": [ + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 4, + "consumer_lag_seconds": 6, + "stream_id": "example-id", + "assignment_type": "auto" + }, + { + "partition": "0", + "state": "assigned", + "unconsumed_events": 5, + "consumer_lag_seconds": 1, + "stream_id": "example-id", + "assignment_type": "auto" + } + ] + } + ] + }`, + unconsumedEvents: 18, + consumerLagSeconds: 6, + }, + { + msg: "test call with invalid response", + status: http.StatusInternalServerError, + responseBody: `{"error": 500}`, + err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"), + }, + { + msg: "test getting back a single data point", + status: http.StatusOK, + responseBody: `{ + "items": [] + }`, + err: errors.New("expected at least 1 event-type, 0 returned"), + }, + } { + tt.Run(ti.msg, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(ti.status) + _, err := w.Write([]byte(ti.responseBody)) + assert.NoError(t, err) + }), + ) + defer ts.Close() + + nakadiClient := NewNakadiClient(ts.URL, client) + consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id") + assert.Equal(t, ti.err, err) + assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds) + unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id") + assert.Equal(t, ti.err, err) + assert.Equal(t, ti.unconsumedEvents, unconsumedEvents) + }) + } + +} diff --git a/pkg/server/start.go b/pkg/server/start.go index 3cf62b34..15dc7d42 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -35,6 +35,7 @@ import ( "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned" "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector" "github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" "github.com/zalando-incubator/kube-metrics-adapter/pkg/provider" "github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon" "golang.org/x/oauth2" @@ -64,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { EnableExternalMetricsAPI: true, MetricsAddress: ":7979", ZMONTokenName: "zmon", + NakadiTokenName: "nakadi", CredentialsDir: "/meta/credentials", ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count", } @@ -110,8 +112,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { "url of ZMON KariosDB endpoint to query for ZMON checks") flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+ "name of the token used to query ZMON") + flags.StringVar(&o.NakadiEndpoint, "nakadi-endpoint", o.NakadiEndpoint, ""+ + "url of Nakadi endpoint to for nakadi subscription stats") + flags.StringVar(&o.NakadiTokenName, "nakadi-token-name", o.NakadiTokenName, ""+ + "name of the token used to call nakadi subscription API") flags.StringVar(&o.Token, "token", o.Token, ""+ - "static oauth2 token to use when calling external services like ZMON") + "static oauth2 token to use when calling external services like ZMON and Nakadi") flags.StringVar(&o.CredentialsDir, "credentials-dir", o.CredentialsDir, ""+ "path to the credentials dir where tokens are stored") flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+ @@ -274,6 +280,27 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct collectorFactory.RegisterExternalCollector([]string{collector.ZMONMetricType, collector.ZMONCheckMetricLegacy}, zmonPlugin) } + // enable Nakadi based metrics + if o.NakadiEndpoint != "" { + var tokenSource oauth2.TokenSource + if o.Token != "" { + tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token}) + } else { + tokenSource = platformiam.NewTokenSource(o.NakadiTokenName, o.CredentialsDir) + } + + httpClient := newOauth2HTTPClient(ctx, tokenSource) + + nakadiClient := nakadi.NewNakadiClient(o.NakadiEndpoint, httpClient) + + nakadiPlugin, err := collector.NewNakadiCollectorPlugin(nakadiClient) + if err != nil { + return fmt.Errorf("failed to initialize Nakadi collector plugin: %v", err) + } + + collectorFactory.RegisterExternalCollector([]string{collector.NakadiMetricType}, nakadiPlugin) + } + awsSessions := make(map[string]*session.Session, len(o.AWSRegions)) for _, region := range o.AWSRegions { awsSessions[region], err = session.NewSessionWithOptions(session.Options{ @@ -427,6 +454,10 @@ type AdapterServerOptions struct { ZMONKariosDBEndpoint string // ZMONTokenName is the name of the token used to query ZMON ZMONTokenName string + // NakadiEndpoint enables Nakadi metrics from the specified endpoint + NakadiEndpoint string + // NakadiTokenName is the name of the token used to call Nakadi + NakadiTokenName string // Token is an oauth2 token used to authenticate with services like // ZMON. Token string