From 415343f304b574d4f024a82e0fad266060c34cf5 Mon Sep 17 00:00:00 2001 From: Street <5597260+MStreet3@users.noreply.github.com> Date: Thu, 16 Jan 2025 17:00:00 -0500 Subject: [PATCH] fix(workflow/fetcher): validates response before use (#15921) * fix(workflow/fetcher): validates response before use * feat(capabilities): add Validate method on Response * feat(workflows/syncer): use Validate in Fetch and test * refactor(workflows/syncer): simplifies getWorkflowArtifacts * refactor(workflow/fetcher): lint fixes and clean up --- .changeset/cyan-ladybugs-check.md | 5 + .../gateway/handlers/capabilities/webapi.go | 21 ++++ .../handlers/capabilities/webapi_test.go | 54 ++++++++ core/services/workflows/syncer/fetcher.go | 14 ++- .../services/workflows/syncer/fetcher_test.go | 115 +++++++++++++++--- core/services/workflows/syncer/handler.go | 45 +++---- 6 files changed, 214 insertions(+), 40 deletions(-) create mode 100644 .changeset/cyan-ladybugs-check.md create mode 100644 core/services/gateway/handlers/capabilities/webapi_test.go diff --git a/.changeset/cyan-ladybugs-check.md b/.changeset/cyan-ladybugs-check.md new file mode 100644 index 00000000000..d430890001a --- /dev/null +++ b/.changeset/cyan-ladybugs-check.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +validates response from gateway in workflow/fetcher diff --git a/core/services/gateway/handlers/capabilities/webapi.go b/core/services/gateway/handlers/capabilities/webapi.go index a0213eb8f42..f4a324420c0 100644 --- a/core/services/gateway/handlers/capabilities/webapi.go +++ b/core/services/gateway/handlers/capabilities/webapi.go @@ -1,5 +1,7 @@ package capabilities +import "errors" + type Request struct { URL string `json:"url"` // URL to query, only http and https protocols are supported. Method string `json:"method,omitempty"` // HTTP verb, defaults to GET. @@ -16,6 +18,25 @@ type Response struct { Body []byte `json:"body,omitempty"` // HTTP response body } +// Validate ensures the Response struct is consistent. +func (r Response) Validate() error { + if r.ExecutionError { + if r.ErrorMessage == "" { + return errors.New("executionError is true but errorMessage is empty") + } + if r.StatusCode != 0 || len(r.Headers) > 0 || len(r.Body) > 0 { + return errors.New("executionError is true but response details (statusCode, headers, body) are populated") + } + return nil + } + + if r.StatusCode < 100 || r.StatusCode > 599 { + return errors.New("statusCode must be a valid HTTP status code (100-599)") + } + + return nil +} + type TriggerResponsePayload struct { ErrorMessage string `json:"error_message,omitempty"` // ERROR, ACCEPTED, PENDING, COMPLETED diff --git a/core/services/gateway/handlers/capabilities/webapi_test.go b/core/services/gateway/handlers/capabilities/webapi_test.go new file mode 100644 index 00000000000..2030c898053 --- /dev/null +++ b/core/services/gateway/handlers/capabilities/webapi_test.go @@ -0,0 +1,54 @@ +package capabilities + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestResponseValidate(t *testing.T) { + tt := []struct { + name string + response Response + expectError string + }{ + { + name: "valid Response with ExecutionError", + response: Response{ExecutionError: true, ErrorMessage: "Some error"}, + }, + { + name: "invalid Response with ExecutionError but no ErrorMessage", + response: Response{ExecutionError: true}, + expectError: "executionError is true but errorMessage is empty", + }, + { + name: "valid HTTP Response", + response: Response{StatusCode: 200}, + }, + { + name: "invalid status code", + response: Response{ + Body: []byte("body"), + }, + expectError: "statusCode must be set when executionError is false", + }, + { + name: "invalid HTTP Response with bad StatusCode", + response: Response{StatusCode: 700}, + expectError: "statusCode must be a valid HTTP status code (100-599)", + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + err := tc.response.Validate() + + if tc.expectError != "" { + require.Error(t, err) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index 5c8856d58c1..2e4fbd51354 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -93,13 +93,21 @@ func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error) return nil, err } - s.lggr.Debugw("received gateway response") + if err = resp.Validate(); err != nil { + return nil, fmt.Errorf("invalid response from gateway: %w", err) + } + + s.lggr.Debugw("received gateway response", "donID", resp.Body.DonId, "msgID", resp.Body.MessageId) + var payload ghcapabilities.Response - err = json.Unmarshal(resp.Body.Payload, &payload) - if err != nil { + if err = json.Unmarshal(resp.Body.Payload, &payload); err != nil { return nil, err } + if err = payload.Validate(); err != nil { + return nil, fmt.Errorf("invalid payload received from gateway message: %w", err) + } + if payload.ExecutionError { return nil, fmt.Errorf("execution error from gateway: %s", payload.ErrorMessage) } diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go index 017b052f8ab..600cde5c577 100644 --- a/core/services/workflows/syncer/fetcher_test.go +++ b/core/services/workflows/syncer/fetcher_test.go @@ -2,6 +2,7 @@ package syncer import ( "context" + "crypto/ecdsa" "encoding/json" "strings" "testing" @@ -11,10 +12,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/utils/matches" ) @@ -33,9 +36,11 @@ func TestNewFetcherService(t *testing.T) { connector := gcmocks.NewGatewayConnector(t) wrapper := &wrapper{c: connector} - url := "http://example.com" - - msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/") + var ( + url = "http://example.com" + msgID = strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/") + donID = "don-id" + ) t.Run("OK-valid_request", func(t *testing.T) { connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil) @@ -44,11 +49,11 @@ func TestNewFetcherService(t *testing.T) { require.NoError(t, fetcher.Start(ctx)) defer fetcher.Close() - gatewayResp := gatewayResponse(t, msgID) + gatewayResp := signGatewayResponse(t, gatewayResponse(t, msgID, donID)) connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) }).Return(nil).Times(1) - connector.EXPECT().DonID().Return("don-id") + connector.EXPECT().DonID().Return(donID) connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil) connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) @@ -59,13 +64,51 @@ func TestNewFetcherService(t *testing.T) { require.Equal(t, expectedPayload, payload) }) + t.Run("fails with invalid payload response", func(t *testing.T) { + connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil) + + fetcher := NewFetcherService(lggr, wrapper) + require.NoError(t, fetcher.Start(ctx)) + defer fetcher.Close() + + gatewayResp := signGatewayResponse(t, inconsistentPayload(t, msgID, donID)) + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Return(nil).Times(1) + connector.EXPECT().DonID().Return(donID) + connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil) + connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) + + _, err := fetcher.Fetch(ctx, url) + require.Error(t, err) + }) + + t.Run("fails due to invalid gateway response", func(t *testing.T) { + connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil) + + fetcher := NewFetcherService(lggr, wrapper) + require.NoError(t, fetcher.Start(ctx)) + defer fetcher.Close() + + gatewayResp := gatewayResponse(t, msgID, donID) // gateway response that is not signed + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Return(nil).Times(1) + connector.EXPECT().DonID().Return(donID) + connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil) + connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) + + _, err := fetcher.Fetch(ctx, url) + require.Error(t, err) + require.ErrorContains(t, err, "invalid response from gateway") + }) + t.Run("NOK-response_payload_too_large", func(t *testing.T) { headers := map[string]string{"Content-Type": "application/json"} responsePayload, err := json.Marshal(ghcapabilities.Response{ - StatusCode: 400, - Headers: headers, - ErrorMessage: "http: request body too large", - ExecutionError: true, + StatusCode: 400, + Headers: headers, + ErrorMessage: "http: request body too large", }) require.NoError(t, err) gatewayResponse := &api.Message{ @@ -85,7 +128,7 @@ func TestNewFetcherService(t *testing.T) { connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResponse) }).Return(nil).Times(1) - connector.EXPECT().DonID().Return("don-id") + connector.EXPECT().DonID().Return(donID) connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil) connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) @@ -94,21 +137,63 @@ func TestNewFetcherService(t *testing.T) { }) } -func gatewayResponse(t *testing.T, msgID string) *api.Message { +// gatewayResponse creates an unsigned gateway response with a status code of 200 and a response body. +func gatewayResponse(t *testing.T, msgID string, donID string) *api.Message { headers := map[string]string{"Content-Type": "application/json"} body := []byte("response body") responsePayload, err := json.Marshal(ghcapabilities.Response{ - StatusCode: 200, - Headers: headers, - Body: body, - ExecutionError: false, + StatusCode: 200, + Headers: headers, + Body: body, }) require.NoError(t, err) return &api.Message{ Body: api.MessageBody{ MessageId: msgID, + DonId: donID, Method: ghcapabilities.MethodWebAPITarget, Payload: responsePayload, }, } } + +// inconsistentPayload creates an unsigned gateway response with an inconsistent payload. The +// ExecutionError is true, but there is no ErrorMessage, so it is invalid. +func inconsistentPayload(t *testing.T, msgID string, donID string) *api.Message { + responsePayload, err := json.Marshal(ghcapabilities.Response{ + ExecutionError: true, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + DonId: donID, + Method: ghcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +} + +// signGatewayResponse signs the gateway response with a private key and arbitrarily sets the receiver +// to the signer's address. A signature and receiver are required for a valid gateway response. +func signGatewayResponse(t *testing.T, msg *api.Message) *api.Message { + nodeKeys := common.NewTestNodes(t, 1) + s := &signer{pk: nodeKeys[0].PrivateKey} + signature, err := s.Sign(api.GetRawMessageBody(&msg.Body)...) + require.NoError(t, err) + msg.Signature = utils.StringToHex(string(signature)) + + signerBytes, err := msg.ExtractSigner() + require.NoError(t, err) + + msg.Body.Receiver = utils.StringToHex(string(signerBytes)) + return msg +} + +type signer struct { + pk *ecdsa.PrivateKey +} + +func (s *signer) Sign(data ...[]byte) ([]byte, error) { + return common.SignData(s.pk, data...) +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 6d0ee7073e9..c8dbf94846d 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -495,34 +495,35 @@ func (h *eventHandler) getWorkflowArtifacts( ctx context.Context, payload WorkflowRegistryWorkflowRegisteredV1, ) ([]byte, []byte, error) { - spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:])) - if err != nil { - binary, err2 := h.fetcher(ctx, payload.BinaryURL) - if err2 != nil { - return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err2) + // Check if the workflow spec is already stored in the database + if spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:])); err == nil { + // there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts + decodedBinary, err := hex.DecodeString(spec.Workflow) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err) } + return decodedBinary, []byte(spec.Config), nil + } - decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary)) - if err2 != nil { - return nil, nil, fmt.Errorf("failed to decode binary: %w", err2) - } + // Fetch the binary and config files from the specified URLs. + var ( + binary, decodedBinary, config []byte + err error + ) + if binary, err = h.fetcher(ctx, payload.BinaryURL); err != nil { + return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) + } - var config []byte - if payload.ConfigURL != "" { - config, err2 = h.fetcher(ctx, payload.ConfigURL) - if err2 != nil { - return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err2) - } - } - return decodedBinary, config, nil + if decodedBinary, err = base64.StdEncoding.DecodeString(string(binary)); err != nil { + return nil, nil, fmt.Errorf("failed to decode binary: %w", err) } - // there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts - decodedBinary, err := hex.DecodeString(spec.Workflow) - if err != nil { - return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err) + if payload.ConfigURL != "" { + if config, err = h.fetcher(ctx, payload.ConfigURL); err != nil { + return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } } - return decodedBinary, []byte(spec.Config), nil + return decodedBinary, config, nil } func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {