From 9d2717aaac587f115549e39ae052d3933b625944 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Mon, 23 Sep 2024 14:02:40 +0200 Subject: [PATCH 1/6] Support multiple queues at the IBMMQ scaler Signed-off-by: rickbrouwer --- CHANGELOG.md | 1 + pkg/scalers/ibmmq_scaler.go | 92 +++++++++++++++++--------------- pkg/scalers/ibmmq_scaler_test.go | 7 ++- 3 files changed, 56 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71cef1b99cf..c577eab4421 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) +- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index 0b0b4c893f4..dafbc0d0742 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -26,18 +26,18 @@ type ibmmqScaler struct { } type ibmmqMetadata struct { - Host string `keda:"name=host, order=triggerMetadata"` - QueueName string `keda:"name=queueName, order=triggerMetadata"` - QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` - ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` - Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` - Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` - UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` - TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead - CA string `keda:"name=ca, order=authParams, optional"` - Cert string `keda:"name=cert, order=authParams, optional"` - Key string `keda:"name=key, order=authParams, optional"` - KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Host string `keda:"name=host, order=triggerMetadata"` + QueueName []string `keda:"name=queueName, order=triggerMetadata"` + QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` + ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` + Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` + Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` + TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead + CA string `keda:"name=ca, order=authParams, optional"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` triggerIndex int } @@ -129,54 +129,62 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro } func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { - queue := s.metadata.QueueName + maxDepth := int64(0) url := s.metadata.Host - var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`) - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON)) + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { - return 0, fmt.Errorf("failed to request queue depth: %w", err) + return 0, fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("ibm-mq-rest-csrf-token", "value") req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(s.metadata.Username, s.metadata.Password) - resp, err := s.httpClient.Do(req) - if err != nil { - return 0, fmt.Errorf("failed to contact MQ via REST: %w", err) - } - defer resp.Body.Close() + for _, queueName := range s.metadata.QueueName { + requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`) + req.Body = io.NopCloser(bytes.NewBuffer(requestJSON)) - body, err := io.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read body of request: %w", err) - } + resp, err := s.httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err) + } + defer resp.Body.Close() - var response CommandResponse - err = json.Unmarshal(body, &response) - if err != nil { - return 0, fmt.Errorf("failed to parse JSON: %w", err) - } + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err) + } - if response.CommandResponse == nil || len(response.CommandResponse) == 0 { - return 0, fmt.Errorf("failed to parse response from REST call") - } + var response CommandResponse + err = json.Unmarshal(body, &response) + if err != nil { + return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err) + } + + if response.CommandResponse == nil || len(response.CommandResponse) == 0 { + return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName) + } + + if response.CommandResponse[0].Parameters == nil { + var reason string + message := strings.Join(response.CommandResponse[0].Message, " ") + if message != "" { + reason = fmt.Sprintf(", reason: %s", message) + } + return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason) + } - if response.CommandResponse[0].Parameters == nil { - var reason string - message := strings.Join(response.CommandResponse[0].Message, " ") - if message != "" { - reason = fmt.Sprintf(", reason: %s", message) + depth := int64(response.CommandResponse[0].Parameters.Curdepth) + if depth > maxDepth { + maxDepth = depth } - return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason) } - return int64(response.CommandResponse[0].Parameters.Curdepth), nil + return maxDepth, nil } func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { - metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName)) + metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0])) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index 30a7fc4b132..32c87d54fa2 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -51,6 +51,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{ {map[string]string{}, true, map[string]string{}}, // Properly formed metadata {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed metadata with 2 queues + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid queueDepth using a string {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid activationQueueDepth using a string @@ -89,7 +91,7 @@ func TestIBMMQParseMetadata(t *testing.T) { t.Error("Expected error but got success") fmt.Println(testData) } - if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] { + if metadata.Password != "" && metadata.Password != testData.authParams["password"] { t.Error("Expected password from configuration but found something else: ", metadata.Password) fmt.Println(testData) } @@ -216,7 +218,8 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) { scaler := ibmmqScaler{ metadata: ibmmqMetadata{ - Host: server.URL, + Host: server.URL, + QueueName: []string{"TEST.QUEUE"}, }, httpClient: server.Client(), } From 57f618e25dbec9bf8029291007e3674cc61b2b3a Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Thu, 26 Sep 2024 15:53:32 +0200 Subject: [PATCH 2/6] support multi name Signed-off-by: Rick Brouwer --- pkg/scalers/ibmmq_scaler.go | 2 +- pkg/scalers/ibmmq_scaler_test.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index dafbc0d0742..705635a9c23 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -27,7 +27,7 @@ type ibmmqScaler struct { type ibmmqMetadata struct { Host string `keda:"name=host, order=triggerMetadata"` - QueueName []string `keda:"name=queueName, order=triggerMetadata"` + QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"` QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index 32c87d54fa2..5618fd68578 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -53,6 +53,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{ {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Properly formed metadata with 2 queues {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed metadata with 2 queues with param queueNames + {map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid queueDepth using a string {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid activationQueueDepth using a string From 272e04fae9eb59ce05c9bb110a944e42921f7273 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Wed, 2 Oct 2024 14:21:43 +0200 Subject: [PATCH 3/6] Check for authentication error Signed-off-by: Rick Brouwer --- CHANGELOG.md | 1 - pkg/scalers/ibmmq_scaler.go | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c577eab4421..71cef1b99cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,7 +72,6 @@ Here is an overview of all new **experimental** features: - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) -- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index 705635a9c23..d5114b494f6 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -150,6 +150,10 @@ func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { } defer resp.Body.Close() + if resp.StatusCode == http.StatusUnauthorized { + return 0, fmt.Errorf("authentication failed: incorrect username or password") + } + body, err := io.ReadAll(resp.Body) if err != nil { return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err) From 5a7f395898a6724cfc97b6c921c0b92e0952cd1b Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Mon, 7 Oct 2024 17:21:53 +0200 Subject: [PATCH 4/6] Update changelog Signed-off-by: Rick Brouwer --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2790b3ae0ab..7385252f146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features: - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214)) +- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) - **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144)) From 76c08d255b40718058f477525b8a917b41654f41 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Mon, 7 Oct 2024 17:31:51 +0200 Subject: [PATCH 5/6] Add operation Signed-off-by: rickbrouwer --- pkg/scalers/ibmmq_scaler.go | 46 +++++++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index d5114b494f6..b5d12e8cd09 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -30,6 +30,7 @@ type ibmmqMetadata struct { QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"` QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` + Operation string `keda:"name=operation, order=triggerMetadata, default=max"` Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` @@ -129,7 +130,7 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro } func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { - maxDepth := int64(0) + depths := make([]int64, 0, len(s.metadata.QueueName)) url := s.metadata.Host req, err := http.NewRequestWithContext(ctx, "POST", url, nil) @@ -179,12 +180,47 @@ func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { } depth := int64(response.CommandResponse[0].Parameters.Curdepth) - if depth > maxDepth { - maxDepth = depth - } + depths = append(depths, depth) + } + + switch s.metadata.Operation { + case sumOperation: + return sumDepths(depths), nil + case avgOperation: + return avgDepths(depths), nil + case maxOperation: + return maxDepth(depths), nil + default: + return 0, fmt.Errorf("operation mode %s must be one of %s, %s, %s", s.metadata.Operation, sumOperation, avgOperation, maxOperation) } +} + +func sumDepths(depths []int64) int64 { + var sum int64 + for _, depth := range depths { + sum += depth + } + return sum +} - return maxDepth, nil +func avgDepths(depths []int64) int64 { + if len(depths) == 0 { + return 0 + } + return sumDepths(depths) / int64(len(depths)) +} + +func maxDepth(depths []int64) int64 { + if len(depths) == 0 { + return 0 + } + max := depths[0] + for _, depth := range depths[1:] { + if depth > max { + max = depth + } + } + return max } func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { From ba9d2f7dfb5c391dec9e4acffe26efc5061f8d51 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Mon, 7 Oct 2024 17:33:27 +0200 Subject: [PATCH 6/6] Add operation test Signed-off-by: rickbrouwer --- pkg/scalers/ibmmq_scaler.go | 4 ++-- pkg/scalers/ibmmq_scaler_test.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index b5d12e8cd09..11df92c4716 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -30,7 +30,7 @@ type ibmmqMetadata struct { QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"` QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` - Operation string `keda:"name=operation, order=triggerMetadata, default=max"` + Operation string `keda:"name=operation, order=triggerMetadata, enum=max;avg;sum, default=max"` Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` @@ -191,7 +191,7 @@ func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { case maxOperation: return maxDepth(depths), nil default: - return 0, fmt.Errorf("operation mode %s must be one of %s, %s, %s", s.metadata.Operation, sumOperation, avgOperation, maxOperation) + return 0, nil } } diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index 5618fd68578..f5c1d73aa2b 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -55,6 +55,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{ {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Properly formed metadata with 2 queues with param queueNames {map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Invalid operation + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "operation": "test", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid queueDepth using a string {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid activationQueueDepth using a string @@ -222,6 +224,7 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) { metadata: ibmmqMetadata{ Host: server.URL, QueueName: []string{"TEST.QUEUE"}, + Operation: "max", }, httpClient: server.Client(), }