diff --git a/README.md b/README.md index 3f3552a4..ecb88043 100644 --- a/README.md +++ b/README.md @@ -702,6 +702,51 @@ you need to define a `key` or other `tag` with a "star" query syntax like metric label definitions. If both annotations and corresponding label is defined, then the annotation takes precedence. + +## Nakadi collector + +The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/) +`consumer_lag_seconds` or `unconsumed_events`. + +```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: myapp-hpa + annotations: + metric-config.external.my-nakadi-event.nakadi/interval: "60s" # optional +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: custom-metrics-consumer + minReplicas: 0 + maxReplicas: 8 # should match number of partitions for the event type + metrics: + - type: External + external: + metric: + name: my-nakadi-event + selector: + matchLabels: + type: nakadi + subscription-id: "28d86692-ddf4-4f6d-8cda-d64f7f9e56c1" + metric-type: "consumer-lag-seconds|unconsumed-events" + target: + # value is compatible with the consumer-lag-seconds metric type. + # It describes the amount of consumer lag in seconds before scaling + # additionally up. + # if an event-type has multiple partitions the value of + # consumer-lag-seconds is the max of all the partitions. + value: "600" # 10m + # averageValue is compatible with unconsumed-events metric type. + # This means for every 30 unconsumed events a pod is scaled up. + # If an event-type has multiple partitions the value of + # unconsumed-events is the sum from all partitions. + averageValue: "30" + type: AverageValue +``` + ## HTTP Collector The http collector allows collecting metrics from an external endpoint specified in the HPA. diff --git a/pkg/collector/nakadi_collector.go b/pkg/collector/nakadi_collector.go new file mode 100644 index 00000000..8277cdb7 --- /dev/null +++ b/pkg/collector/nakadi_collector.go @@ -0,0 +1,119 @@ +package collector + +import ( + "context" + "fmt" + "time" + + "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +const ( + // NakadiMetricType defines the metric type for metrics based on Nakadi + // subscriptions. + NakadiMetricType = "nakadi" + nakadiSubscriptionIDKey = "subscription-id" + nakadiMetricTypeKey = "metric-type" + nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds" + nakadiMetricTypeUnconsumedEvents = "unconsumed-events" +) + +// NakadiCollectorPlugin defines a plugin for creating collectors that can get +// unconsumed events from Nakadi. +type NakadiCollectorPlugin struct { + nakadi nakadi.Nakadi +} + +// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin. +func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) { + return &NakadiCollectorPlugin{ + nakadi: nakadi, + }, nil +} + +// NewCollector initializes a new Nakadi collector from the specified HPA. +func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { + return NewNakadiCollector(c.nakadi, hpa, config, interval) +} + +// NakadiCollector defines a collector that is able to collect metrics from +// Nakadi. +type NakadiCollector struct { + nakadi nakadi.Nakadi + interval time.Duration + subscriptionID string + nakadiMetricType string + metric autoscalingv2.MetricIdentifier + metricType autoscalingv2.MetricSourceType + namespace string +} + +// NewNakadiCollector initializes a new NakadiCollector. +func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) { + if config.Metric.Selector == nil { + return nil, fmt.Errorf("selector for nakadi is not specified") + } + + subscriptionID, ok := config.Config[nakadiSubscriptionIDKey] + if !ok { + return nil, fmt.Errorf("subscription-id not specified on metric") + } + + metricType, ok := config.Config[nakadiMetricTypeKey] + if !ok { + return nil, fmt.Errorf("metric-type not specified on metric") + } + + if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents { + return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType) + } + + return &NakadiCollector{ + nakadi: nakadi, + interval: interval, + subscriptionID: subscriptionID, + nakadiMetricType: metricType, + metric: config.Metric, + metricType: config.Type, + namespace: hpa.Namespace, + }, nil +} + +// GetMetrics returns a list of collected metrics for the Nakadi subscription ID. +func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) { + var value int + var err error + if c.nakadiMetricType == nakadiMetricTypeConsumerLagSeconds { + value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID) + if err != nil { + return nil, err + } + } else if c.nakadiMetricType == nakadiMetricTypeUnconsumedEvents { + value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID) + if err != nil { + return nil, err + } + } + + metricValue := CollectedMetric{ + Namespace: c.namespace, + Type: c.metricType, + External: external_metrics.ExternalMetricValue{ + MetricName: c.metric.Name, + MetricLabels: c.metric.Selector.MatchLabels, + Timestamp: metav1.Time{Time: time.Now().UTC()}, + Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), + }, + } + + return []CollectedMetric{metricValue}, nil +} + +// Interval returns the interval at which the collector should run. +func (c *NakadiCollector) Interval() time.Duration { + return c.interval +} diff --git a/pkg/nakadi/nakadi.go b/pkg/nakadi/nakadi.go new file mode 100644 index 00000000..c9219593 --- /dev/null +++ b/pkg/nakadi/nakadi.go @@ -0,0 +1,117 @@ +package nakadi + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" +) + +// Nakadi defines an interface for talking to the Nakadi API. +type Nakadi interface { + ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int, error) + UnconsumedEvents(ctx context.Context, subscriptionID string) (int, error) +} + +// Client defines client for interfacing with the Nakadi API. +type Client struct { + nakadiEndpoint string + http *http.Client +} + +// NewNakadiClient initializes a new Nakadi Client. +func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client { + return &Client{ + nakadiEndpoint: nakadiEndpoint, + http: client, + } +} + +func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int, error) { + stats, err := c.stats(ctx, subscriptionID) + if err != nil { + return 0, err + } + + var maxConsumerLagSeconds int + for _, partition := range stats.Partitions { + if partition.ConsumerLagSeconds > maxConsumerLagSeconds { + maxConsumerLagSeconds = partition.ConsumerLagSeconds + } + } + + return maxConsumerLagSeconds, nil +} + +func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int, error) { + stats, err := c.stats(ctx, subscriptionID) + if err != nil { + return 0, err + } + + var unconsumedEvents int + for _, partition := range stats.Partitions { + unconsumedEvents += partition.UnconsumedEvents + } + + return unconsumedEvents, nil +} + +type statsResp struct { + Partitions []statsPartition `json:"partitions"` +} + +type statsPartition struct { + Partiton string `json:"partition"` + State string `json:"state"` + UnconsumedEvents int `json:"unconsumed_events"` + ConsumerLagSeconds int `json:"consumer_lag_seconds"` + StreamID string `json:"stream_id"` + AssignmentType string `json:"assignment_type"` +} + +// stats returns the Nakadi stats for a given subscription ID. +// +// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get +func (c *Client) stats(ctx context.Context, subscriptionID string) (*statsResp, error) { + endpoint, err := url.Parse(c.nakadiEndpoint) + if err != nil { + return nil, err + } + + endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID) + + q := endpoint.Query() + q.Set("show_time_lag", "true") + endpoint.RawQuery = q.Encode() + + req, err := http.NewRequest(http.MethodGet, endpoint.String(), nil) + if err != nil { + return nil, err + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + d, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d)) + } + + var result statsResp + err = json.Unmarshal(d, &result) + if err != nil { + return nil, err + } + + return &result, nil +} diff --git a/pkg/server/start.go b/pkg/server/start.go index 3cf62b34..1fc14433 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -35,6 +35,7 @@ import ( "github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned" "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector" "github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi" "github.com/zalando-incubator/kube-metrics-adapter/pkg/provider" "github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon" "golang.org/x/oauth2" @@ -110,6 +111,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { "url of ZMON KariosDB endpoint to query for ZMON checks") flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+ "name of the token used to query ZMON") + flags.StringVar(&o.NakadiEndpoint, "nakadi-endpoint", o.NakadiEndpoint, ""+ + "url of Nakadi endpoint to for nakadi subscription stats") + flags.StringVar(&o.NakadiTokenName, "nakadi-token-name", o.NakadiTokenName, ""+ + "name of the token used to call nakadi subscription API") flags.StringVar(&o.Token, "token", o.Token, ""+ "static oauth2 token to use when calling external services like ZMON") flags.StringVar(&o.CredentialsDir, "credentials-dir", o.CredentialsDir, ""+ @@ -274,6 +279,27 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct collectorFactory.RegisterExternalCollector([]string{collector.ZMONMetricType, collector.ZMONCheckMetricLegacy}, zmonPlugin) } + // enable Nakadi based metrics + if o.NakadiEndpoint != "" { + var tokenSource oauth2.TokenSource + if o.Token != "" { + tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token}) + } else { + tokenSource = platformiam.NewTokenSource(o.NakadiTokenName, o.CredentialsDir) + } + + httpClient := newOauth2HTTPClient(ctx, tokenSource) + + nakadiClient := nakadi.NewNakadiClient(o.NakadiEndpoint, httpClient) + + nakadiPlugin, err := collector.NewNakadiCollectorPlugin(nakadiClient) + if err != nil { + return fmt.Errorf("failed to initialize ZMON collector plugin: %v", err) + } + + collectorFactory.RegisterExternalCollector([]string{collector.NakadiMetricType}, nakadiPlugin) + } + awsSessions := make(map[string]*session.Session, len(o.AWSRegions)) for _, region := range o.AWSRegions { awsSessions[region], err = session.NewSessionWithOptions(session.Options{ @@ -427,6 +453,10 @@ type AdapterServerOptions struct { ZMONKariosDBEndpoint string // ZMONTokenName is the name of the token used to query ZMON ZMONTokenName string + // NakadiEndpoint enables Nakadi metrics from the specified endpoint + NakadiEndpoint string + // NakadiTokenName is the name of the token used to call Nakadi + NakadiTokenName string // Token is an oauth2 token used to authenticate with services like // ZMON. Token string