Skip to content

Commit

Permalink
update scaler code
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhavesh authored and Bhavesh committed Sep 30, 2024
1 parent 1b32e63 commit 958ff02
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
53 changes: 36 additions & 17 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/gcp"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -18,7 +20,7 @@ const (
)

type stackdriverScaler struct {
client *StackDriverClient
client *gcp.StackDriverClient
metricType v2.MetricTargetType
metadata *stackdriverMetadata
logger logr.Logger
Expand All @@ -30,13 +32,15 @@ type stackdriverMetadata struct {
targetValue float64
activationTargetValue float64
metricName string
valueIfNull *float64
filterDuration int64

gcpAuthorization *gcpAuthorizationMetadata
gcpAuthorization *gcp.AuthorizationMetadata
aggregation *monitoringpb.Aggregation
}

// NewStackdriverScaler creates a new stackdriverScaler
func NewStackdriverScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
func NewStackdriverScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
Expand All @@ -63,7 +67,7 @@ func NewStackdriverScaler(ctx context.Context, config *ScalerConfig) (Scaler, er
}, nil
}

func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackdriverMetadata, error) {
func parseStackdriverMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*stackdriverMetadata, error) {
meta := stackdriverMetadata{}
meta.targetValue = defaultStackdriverTargetValue

Expand All @@ -88,7 +92,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
}

name := kedautil.NormalizeString(fmt.Sprintf("gcp-stackdriver-%s", meta.projectID))
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, name)
meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, name)

if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseFloat(val, 64)
Expand All @@ -109,7 +113,23 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
meta.activationTargetValue = activationTargetValue
}

auth, err := getGCPAuthorization(config)
if val, ok := config.TriggerMetadata["valueIfNull"]; ok && val != "" {
valueIfNull, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("valueIfNull parsing error %w", err)
}
meta.valueIfNull = &valueIfNull
}

if val, ok := config.TriggerMetadata["filterDuration"]; ok {
filterDuration, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("filterDuration parsing error %w", err)
}
meta.filterDuration = filterDuration
}

auth, err := gcp.GetGCPAuthorization(config)
if err != nil {
return nil, err
}
Expand All @@ -124,7 +144,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
return &meta, nil
}

func parseAggregation(config *ScalerConfig, logger logr.Logger) (*monitoringpb.Aggregation, error) {
func parseAggregation(config *scalersconfig.ScalerConfig, logger logr.Logger) (*monitoringpb.Aggregation, error) {
if period, ok := config.TriggerMetadata["alignmentPeriodSeconds"]; ok {
if period == "" {
return nil, nil
Expand All @@ -140,19 +160,19 @@ func parseAggregation(config *ScalerConfig, logger logr.Logger) (*monitoringpb.A
return nil, fmt.Errorf("error parsing alignmentPeriodSeconds: %w", err)
}

return NewStackdriverAggregator(val, config.TriggerMetadata["alignmentAligner"], config.TriggerMetadata["alignmentReducer"])
return gcp.NewStackdriverAggregator(val, config.TriggerMetadata["alignmentAligner"], config.TriggerMetadata["alignmentReducer"])
}

return nil, nil
}

func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAuthorizationMetadata, logger logr.Logger) (*StackDriverClient, error) {
var client *StackDriverClient
func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcp.AuthorizationMetadata, logger logr.Logger) (*gcp.StackDriverClient, error) {
var client *gcp.StackDriverClient
var err error
if gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
if gcpAuthorization.PodIdentityProviderEnabled {
client, err = gcp.NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, gcpAuthorization.GoogleApplicationCredentials)
client, err = gcp.NewStackDriverClient(ctx, gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
Expand All @@ -164,13 +184,12 @@ func initializeStackdriverClient(ctx context.Context, gcpAuthorization *gcpAutho

func (s *stackdriverScaler) Close(context.Context) error {
if s.client != nil {
err := s.client.metricsClient.Close()
err := s.client.Close()
s.client = nil
if err != nil {
s.logger.Error(err, "error closing StackDriver client")
}
}

return nil
}

Expand Down Expand Up @@ -207,7 +226,7 @@ func (s *stackdriverScaler) GetMetricsAndActivity(ctx context.Context, metricNam

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (float64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation)
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation, s.metadata.valueIfNull, s.metadata.filterDuration)
if err == nil {
s.logger.V(1).Info(
fmt.Sprintf("Getting metrics for project %s, filter %s and aggregation %v. Result: %f",
Expand All @@ -218,4 +237,4 @@ func (s *stackdriverScaler) getMetrics(ctx context.Context) (float64, error) {
}

return val, err
}
}
14 changes: 10 additions & 4 deletions pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"

"github.com/go-logr/logr"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

var testStackdriverResolvedEnv = map[string]string{
Expand All @@ -19,7 +21,7 @@ type parseStackdriverMetadataTestData struct {

type gcpStackdriverMetricIdentifier struct {
metadataTestData *parseStackdriverMetadataTestData
scalerIndex int
triggerIndex int
name string
}

Expand Down Expand Up @@ -55,6 +57,10 @@ var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "a"}, true},
// properly formed float targetValue and activationTargetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "targetValue": "1.1", "activationTargetValue": "2.1"}, false},
// properly formed float valueIfNull
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "targetValue": "1.1", "activationTargetValue": "2.1", "valueIfNull": "1.0"}, false},
// With bad valueIfNull
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "targetValue": "1.1", "activationTargetValue": "2.1", "valueIfNull": "toto"}, true},
}

var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{
Expand All @@ -64,7 +70,7 @@ var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{

func TestStackdriverParseMetadata(t *testing.T) {
for _, testData := range testStackdriverMetadata {
_, err := parseStackdriverMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testStackdriverResolvedEnv}, logr.Discard())
_, err := parseStackdriverMetadata(&scalersconfig.ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testStackdriverResolvedEnv}, logr.Discard())
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
Expand All @@ -76,7 +82,7 @@ func TestStackdriverParseMetadata(t *testing.T) {

func TestGcpStackdriverGetMetricSpecForScaling(t *testing.T) {
for _, testData := range gcpStackdriverMetricIdentifiers {
meta, err := parseStackdriverMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testStackdriverResolvedEnv, ScalerIndex: testData.scalerIndex}, logr.Discard())
meta, err := parseStackdriverMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testStackdriverResolvedEnv, TriggerIndex: testData.triggerIndex}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand All @@ -88,4 +94,4 @@ func TestGcpStackdriverGetMetricSpecForScaling(t *testing.T) {
t.Error("Wrong External metric source name:", metricName)
}
}
}
}

0 comments on commit 958ff02

Please sign in to comment.