From ce6f6b3b856df319a76a0d9faeb7acbfb8b7d0fa Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Mon, 9 Sep 2024 10:43:20 +0530 Subject: [PATCH 1/5] Add more context to error response Signed-off-by: Md Soharab Ansari --- common/util.go | 74 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/common/util.go b/common/util.go index c8a7551..e096725 100644 --- a/common/util.go +++ b/common/util.go @@ -1,7 +1,9 @@ package common import ( + "encoding/json" "fmt" + "io" "net/http" "os" "strconv" @@ -14,16 +16,35 @@ import ( "go.uber.org/zap" ) -// ConnectorMetadata contains common fields used by connectors -type ConnectorMetadata struct { - Topic string - ResponseTopic string - ErrorTopic string - HTTPEndpoint string - MaxRetries int - ContentType string - SourceName string -} +type ( + // ConnectorMetadata contains common fields used by connectors + ConnectorMetadata struct { + Topic string + ResponseTopic string + ErrorTopic string + HTTPEndpoint string + MaxRetries int + ContentType string + SourceName string + } + + Request struct { + Message string + HTTPEndpoint string + Headers http.Header + } + + Response struct { + ResponseBody string + StatusCode int + ErrorString string + } + + errorResponse struct { + Request + Response + } +) // ParseConnectorMetadata parses connector side common fields and returns as ConnectorMetadata or returns error func ParseConnectorMetadata() (ConnectorMetadata, error) { @@ -87,12 +108,41 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } } + errResp := errorResponse{ + Request: Request{ + Message: message, + HTTPEndpoint: data.HTTPEndpoint, + Headers: headers, + }, + Response: Response{ + ResponseBody: "", + StatusCode: http.StatusInternalServerError, + ErrorString: "", + }, + } + if resp == nil { - return nil, fmt.Errorf("every function invocation retry failed; final retry gave empty response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + errResp.Response.ErrorString = fmt.Sprintf("every function invocation retry failed; final retry gave empty response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + errorBytes, err := json.Marshal(errResp) + if err != nil { + return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + } + return nil, errors.New(string(errorBytes)) } if resp.StatusCode < 200 || resp.StatusCode > 300 { - return nil, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed reading response body. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + } + errResp.Response.ResponseBody = string(body) + errResp.Response.StatusCode = resp.StatusCode + errResp.Response.ErrorString = fmt.Sprintf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) + errorBytes, err := json.Marshal(errResp) + if err != nil { + return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + } + return nil, errors.New(string(errorBytes)) } return resp, nil } From e2c4a2c0a634e80e4f4c6f70f2c3aa871d34d652 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Mon, 9 Sep 2024 12:02:15 +0530 Subject: [PATCH 2/5] Some cosmetic changes Signed-off-by: Md Soharab Ansari --- common/util.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/util.go b/common/util.go index e096725..2a66637 100644 --- a/common/util.go +++ b/common/util.go @@ -40,9 +40,9 @@ type ( ErrorString string } - errorResponse struct { - Request - Response + ErrorResponse struct { + Request Request + Response Response } ) @@ -108,7 +108,7 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } } - errResp := errorResponse{ + errResp := ErrorResponse{ Request: Request{ Message: message, HTTPEndpoint: data.HTTPEndpoint, From 4fdf9dbae95a824cadcc99f5a4f692b64a886e83 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Mon, 9 Sep 2024 12:50:27 +0530 Subject: [PATCH 3/5] Bump minor version Signed-off-by: Md Soharab Ansari --- aws-kinesis-http-connector/version | 2 +- aws-sqs-http-connector/version | 2 +- gcp-pubsub-http-connector/version | 2 +- kafka-http-connector/version | 2 +- nats-jetstream-http-connector/version | 2 +- nats-streaming-http-connector/version | 2 +- rabbitmq-http-connector/version | 2 +- redis-http-connector/version | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/aws-kinesis-http-connector/version b/aws-kinesis-http-connector/version index 3c58828..1ec829a 100644 --- a/aws-kinesis-http-connector/version +++ b/aws-kinesis-http-connector/version @@ -1 +1 @@ -v0.13 +v0.14 diff --git a/aws-sqs-http-connector/version b/aws-sqs-http-connector/version index 1ec829a..2048935 100644 --- a/aws-sqs-http-connector/version +++ b/aws-sqs-http-connector/version @@ -1 +1 @@ -v0.14 +v0.15 diff --git a/gcp-pubsub-http-connector/version b/gcp-pubsub-http-connector/version index 490a0cd..9097bf9 100644 --- a/gcp-pubsub-http-connector/version +++ b/gcp-pubsub-http-connector/version @@ -1 +1 @@ -v0.9 +v0.10 diff --git a/kafka-http-connector/version b/kafka-http-connector/version index 2048935..39fa176 100644 --- a/kafka-http-connector/version +++ b/kafka-http-connector/version @@ -1 +1 @@ -v0.15 +v0.16 diff --git a/nats-jetstream-http-connector/version b/nats-jetstream-http-connector/version index 03776fb..86de203 100644 --- a/nats-jetstream-http-connector/version +++ b/nats-jetstream-http-connector/version @@ -1 +1 @@ -v0.7 +v0.8 diff --git a/nats-streaming-http-connector/version b/nats-streaming-http-connector/version index 39fa176..2b8bac3 100644 --- a/nats-streaming-http-connector/version +++ b/nats-streaming-http-connector/version @@ -1 +1 @@ -v0.16 +v0.17 diff --git a/rabbitmq-http-connector/version b/rabbitmq-http-connector/version index 3c58828..1ec829a 100644 --- a/rabbitmq-http-connector/version +++ b/rabbitmq-http-connector/version @@ -1 +1 @@ -v0.13 +v0.14 diff --git a/redis-http-connector/version b/redis-http-connector/version index 74d5120..03776fb 100644 --- a/redis-http-connector/version +++ b/redis-http-connector/version @@ -1 +1 @@ -v0.6 +v0.7 From 32530f81e526fe79eee132844219f9070a788a32 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Mon, 9 Sep 2024 13:00:12 +0530 Subject: [PATCH 4/5] Use correct formatter based on data type Signed-off-by: Md Soharab Ansari --- common/util.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/common/util.go b/common/util.go index 2a66637..1d31382 100644 --- a/common/util.go +++ b/common/util.go @@ -122,10 +122,10 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } if resp == nil { - errResp.Response.ErrorString = fmt.Sprintf("every function invocation retry failed; final retry gave empty response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + errResp.Response.ErrorString = fmt.Sprintf("every function invocation retry failed; final retry gave empty response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) errorBytes, err := json.Marshal(errResp) if err != nil { - return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) } return nil, errors.New(string(errorBytes)) } @@ -133,14 +133,14 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada if resp.StatusCode < 200 || resp.StatusCode > 300 { body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("failed reading response body. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + return nil, fmt.Errorf("failed reading response body. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) } errResp.Response.ResponseBody = string(body) errResp.Response.StatusCode = resp.StatusCode - errResp.Response.ErrorString = fmt.Sprintf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) + errResp.Response.ErrorString = fmt.Sprintf("request returned failure: %d. http_endpoint: %s, source: %s", resp.StatusCode, data.HTTPEndpoint, data.SourceName) errorBytes, err := json.Marshal(errResp) if err != nil { - return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %v, source: %v", data.HTTPEndpoint, data.SourceName) + return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) } return nil, errors.New(string(errorBytes)) } From 4611f2bc67cda149c89e0978cded9912f398b0c4 Mon Sep 17 00:00:00 2001 From: Md Soharab Ansari Date: Mon, 16 Sep 2024 15:29:53 +0530 Subject: [PATCH 5/5] Resolve review comments Signed-off-by: Md Soharab Ansari --- common/util.go | 89 +++++++++++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 37 deletions(-) diff --git a/common/util.go b/common/util.go index 1d31382..937732f 100644 --- a/common/util.go +++ b/common/util.go @@ -28,21 +28,21 @@ type ( SourceName string } - Request struct { + FunctionHTTPRequest struct { Message string HTTPEndpoint string Headers http.Header } - Response struct { + FunctionHTTPResponse struct { ResponseBody string StatusCode int ErrorString string } - ErrorResponse struct { - Request Request - Response Response + FunctionErrorDetails struct { + FunctionHTTPRequest FunctionHTTPRequest + FunctionHTTPResponse FunctionHTTPResponse } ) @@ -108,42 +108,14 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } } - errResp := ErrorResponse{ - Request: Request{ - Message: message, - HTTPEndpoint: data.HTTPEndpoint, - Headers: headers, - }, - Response: Response{ - ResponseBody: "", - StatusCode: http.StatusInternalServerError, - ErrorString: "", - }, - } - - if resp == nil { - errResp.Response.ErrorString = fmt.Sprintf("every function invocation retry failed; final retry gave empty response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) - errorBytes, err := json.Marshal(errResp) + if resp == nil || resp.StatusCode < 200 || resp.StatusCode > 300 { + errResp := NewFunctionErrorDetails(message, data.HTTPEndpoint, headers) + err := errResp.UpdateResponseDetails(resp, data) if err != nil { - return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) + return nil, err } - return nil, errors.New(string(errorBytes)) } - if resp.StatusCode < 200 || resp.StatusCode > 300 { - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed reading response body. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) - } - errResp.Response.ResponseBody = string(body) - errResp.Response.StatusCode = resp.StatusCode - errResp.Response.ErrorString = fmt.Sprintf("request returned failure: %d. http_endpoint: %s, source: %s", resp.StatusCode, data.HTTPEndpoint, data.SourceName) - errorBytes, err := json.Marshal(errResp) - if err != nil { - return nil, fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) - } - return nil, errors.New(string(errorBytes)) - } return resp, nil } @@ -172,3 +144,46 @@ func GetAwsConfig() (*aws.Config, error) { } return nil, errors.New("no aws configuration specified") } + +func NewFunctionErrorDetails(message, httpEndpoint string, headers http.Header) FunctionErrorDetails { + return FunctionErrorDetails{ + FunctionHTTPRequest: FunctionHTTPRequest{ + Message: message, + HTTPEndpoint: httpEndpoint, + Headers: headers, + }, + FunctionHTTPResponse: FunctionHTTPResponse{ + ResponseBody: "", + StatusCode: http.StatusInternalServerError, + ErrorString: "", + }, + } +} + +func (errResp *FunctionErrorDetails) UpdateResponseDetails(resp *http.Response, data ConnectorMetadata) error { + if resp == nil { + errResp.FunctionHTTPResponse.ErrorString = fmt.Sprintf("every function invocation retry failed; final retry gave empty response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) + errorBytes, err := json.Marshal(errResp) + if err != nil { + return fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) + } + return errors.New(string(errorBytes)) + } + + if resp.StatusCode < 200 || resp.StatusCode > 300 { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed reading response body. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) + } + errResp.FunctionHTTPResponse.ResponseBody = string(body) + errResp.FunctionHTTPResponse.StatusCode = resp.StatusCode + errResp.FunctionHTTPResponse.ErrorString = fmt.Sprintf("request returned failure: %d. http_endpoint: %s, source: %s", resp.StatusCode, data.HTTPEndpoint, data.SourceName) + errorBytes, err := json.Marshal(errResp) + if err != nil { + return fmt.Errorf("failed marshalling error response. http_endpoint: %s, source: %s", data.HTTPEndpoint, data.SourceName) + } + return errors.New(string(errorBytes)) + } + + return nil +}