diff --git a/common/nexus/failure.go b/common/nexus/failure.go index fde45b31a75..2bda7dbae36 100644 --- a/common/nexus/failure.go +++ b/common/nexus/failure.go @@ -181,7 +181,7 @@ func ConvertGRPCError(err error, exposeDetails bool) error { if !exposeDetails { errMessage = "request timeout" } - return nexus.HandlerErrorf(nexus.HandlerErrorTypeDownstreamTimeout, errMessage) + return nexus.HandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, errMessage) case codes.OK: return nil } @@ -219,8 +219,8 @@ func HandlerErrorTypeFromHTTPStatus(statusCode int) nexus.HandlerErrorType { return nexus.HandlerErrorTypeNotImplemented case http.StatusServiceUnavailable: return nexus.HandlerErrorTypeUnavailable - case nexus.StatusDownstreamTimeout: - return nexus.HandlerErrorTypeDownstreamTimeout + case nexus.StatusUpstreamTimeout: + return nexus.HandlerErrorTypeUpstreamTimeout default: return nexus.HandlerErrorTypeInternal } diff --git a/components/callbacks/executors_test.go b/components/callbacks/executors_test.go index 9fadef38fba..74909d21ccb 100644 --- a/components/callbacks/executors_test.go +++ b/components/callbacks/executors_test.go @@ -443,7 +443,7 @@ func TestProcessBackoffTask(t *testing.T) { } func newMutableState(t *testing.T) mutableState { - completionNexus, err := nexus.NewOperationCompletionSuccessful(nil, nexus.OperationCompletionSuccesfulOptions{}) + completionNexus, err := nexus.NewOperationCompletionSuccessful(nil, nexus.OperationCompletionSuccessfulOptions{}) require.NoError(t, err) hsmCallbackArg := &persistencespb.HSMCompletionCallbackArg{ NamespaceId: "mynsid", diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 71c55819b5e..00a97cff3af 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -302,11 +302,11 @@ func TestProcessInvocationTask(t *testing.T) { onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal server error") }, - expectedMetricOutcome: "request-error:500", + expectedMetricOutcome: "unknown-error", checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) - require.Equal(t, "unexpected response status: \"500 Internal Server Error\": internal server error", op.LastAttemptFailure.Message) + require.Equal(t, "handler error (INTERNAL): internal server error", op.LastAttemptFailure.Message) require.Equal(t, 0, len(events)) }, }, @@ -574,15 +574,15 @@ func TestProcessCancelationTask(t *testing.T) { { name: "failure", requestTimeout: time.Hour, - destinationDown: false, + destinationDown: true, onCancelOperation: func(ctx context.Context, service, operation, operationID string, options nexus.CancelOperationOptions) error { return nexus.HandlerErrorf(nexus.HandlerErrorTypeNotFound, "operation not found") }, - expectedMetricOutcome: "request-error:404", + expectedMetricOutcome: "unknown-error", checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { - require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) + require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, c.State()) require.NotNil(t, c.LastAttemptFailure.GetApplicationFailureInfo()) - require.Equal(t, "unexpected response status: \"404 Not Found\": operation not found", c.LastAttemptFailure.Message) + require.Equal(t, "handler error (NOT_FOUND): operation not found", c.LastAttemptFailure.Message) }, }, { @@ -605,11 +605,11 @@ func TestProcessCancelationTask(t *testing.T) { onCancelOperation: func(ctx context.Context, service, operation, operationID string, options nexus.CancelOperationOptions) error { return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal server error") }, - expectedMetricOutcome: "request-error:500", + expectedMetricOutcome: "unknown-error", checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, c.State()) require.NotNil(t, c.LastAttemptFailure.GetApplicationFailureInfo()) - require.Equal(t, "unexpected response status: \"500 Internal Server Error\": internal server error", c.LastAttemptFailure.Message) + require.Equal(t, "handler error (INTERNAL): internal server error", c.LastAttemptFailure.Message) }, }, { diff --git a/go.mod b/go.mod index 416ccd5402c..c030e1b99ff 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/jmoiron/sqlx v1.3.4 github.com/lib/pq v1.10.9 github.com/mitchellh/mapstructure v1.5.0 - github.com/nexus-rpc/sdk-go v0.0.10 + github.com/nexus-rpc/sdk-go v0.0.11 github.com/olekukonko/tablewriter v0.0.5 github.com/olivere/elastic/v7 v7.0.32 github.com/pborman/uuid v1.2.1 @@ -56,7 +56,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.temporal.io/api v1.39.1-0.20241009135124-c02089051a35 - go.temporal.io/sdk v1.29.2-0.20241008230001-c82a8ac11cc6 + go.temporal.io/sdk v1.29.2-0.20241024205028-5505d0467f79 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.11.0 go.uber.org/automaxprocs v1.5.3 diff --git a/go.sum b/go.sum index 9baab7dd590..e0f04a64ad2 100644 --- a/go.sum +++ b/go.sum @@ -201,8 +201,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/nexus-rpc/sdk-go v0.0.10 h1:7jEPUlsghxoD4OJ2H8YbFJ1t4wbxsUef7yZgBfyY3uA= -github.com/nexus-rpc/sdk-go v0.0.10/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= +github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI= +github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= @@ -323,8 +323,8 @@ go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IO go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= go.temporal.io/api v1.39.1-0.20241009135124-c02089051a35 h1:yw1owD51GPY2mE3+9Y4HITNtvtncQ34uRRv3g2wJ+8I= go.temporal.io/api v1.39.1-0.20241009135124-c02089051a35/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= -go.temporal.io/sdk v1.29.2-0.20241008230001-c82a8ac11cc6 h1:SlZapy1jTeSDEL/0WxKyp2Jw7i/GxdFtqVm1YeQ2tzY= -go.temporal.io/sdk v1.29.2-0.20241008230001-c82a8ac11cc6/go.mod h1:R52PRhHZMkHJqrWRPBom0UAqcexPUvDpcf0qbAGwLos= +go.temporal.io/sdk v1.29.2-0.20241024205028-5505d0467f79 h1:Ix3EZD94Uu5uQFOI6iGzvsXpwAGKD9u2l7MWMnV811I= +go.temporal.io/sdk v1.29.2-0.20241024205028-5505d0467f79/go.mod h1:wLK9oFZTW9hUaktDPKjaNY9bspVzgz10tChQzWbElYw= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index 68f6d5f51fa..832110fa975 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -369,7 +369,7 @@ func (h *nexusHandler) StartOperation( if err != nil { if common.IsContextDeadlineExceededErr(err) { oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_timeout")) - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeDownstreamTimeout, "downstream timeout") + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") } return nil, commonnexus.ConvertGRPCError(err, false) } @@ -490,7 +490,7 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation, if err != nil { if common.IsContextDeadlineExceededErr(err) { oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("handler_timeout")) - return nexus.HandlerErrorf(nexus.HandlerErrorTypeDownstreamTimeout, "downstream timeout") + return nexus.HandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") } return commonnexus.ConvertGRPCError(err, false) } @@ -577,7 +577,7 @@ func (h *nexusHandler) convertOutcomeToNexusHandlerError(resp *matchingservice.D } switch handlerError.Type { - case nexus.HandlerErrorTypeDownstreamTimeout, + case nexus.HandlerErrorTypeUpstreamTimeout, nexus.HandlerErrorTypeUnauthenticated, nexus.HandlerErrorTypeUnauthorized, nexus.HandlerErrorTypeBadRequest, diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e98fca8487c..3a61e22ccc9 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -586,7 +586,7 @@ func (ms *MutableStateImpl) GetNexusCompletion(ctx context.Context) (nexus.Opera // Nexus does not support it. p = payloads[0] } - completion, err := nexus.NewOperationCompletionSuccessful(p, nexus.OperationCompletionSuccesfulOptions{ + completion, err := nexus.NewOperationCompletionSuccessful(p, nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) if err != nil { diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index a91b1b6b72c..118fe1876ae 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -212,8 +212,8 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes() { assertion: func(t *testing.T, res *nexus.ClientStartOperationResult[string], err error) { var unexpectedError *nexus.UnexpectedResponseError require.ErrorAs(t, err, &unexpectedError) - require.Equal(t, nexus.StatusDownstreamTimeout, unexpectedError.Response.StatusCode) - require.Equal(t, "downstream timeout", unexpectedError.Failure.Message) + require.Equal(t, nexus.StatusUpstreamTimeout, unexpectedError.Response.StatusCode) + require.Equal(t, "upstream timeout", unexpectedError.Failure.Message) }, }, } @@ -654,8 +654,8 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes() { assertion: func(t *testing.T, err error) { var unexpectedError *nexus.UnexpectedResponseError require.ErrorAs(t, err, &unexpectedError) - require.Equal(t, nexus.StatusDownstreamTimeout, unexpectedError.Response.StatusCode) - require.Equal(t, "downstream timeout", unexpectedError.Failure.Message) + require.Equal(t, nexus.StatusUpstreamTimeout, unexpectedError.Response.StatusCode) + require.Equal(t, "upstream timeout", unexpectedError.Failure.Message) }, }, } diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 51bb74ac6dd..0d2474efaf1 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -564,7 +564,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion() { // Use -10 to avoid hitting MaxNexusAPIRequestBodyBytes. Actual payload will still exceed limit because of // additional Content headers. See common/rpc/grpc.go:66 s.mustToPayload(strings.Repeat("a", (2*1024*1024)-10)), - nexus.OperationCompletionSuccesfulOptions{Serializer: commonnexus.PayloadSerializer}, + nexus.OperationCompletionSuccessfulOptions{Serializer: commonnexus.PayloadSerializer}, ) s.NoError(err) res, snap := s.sendNexusCompletionRequest(ctx, s.T(), publicCallbackUrl, largeCompletion, callbackToken) @@ -572,7 +572,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion() { s.Equal(1, len(snap["nexus_completion_requests"])) s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": s.Namespace(), "outcome": "error_bad_request"}) - completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload(nil), nexus.OperationCompletionSuccesfulOptions{ + completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload(nil), nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) s.NoError(err) @@ -621,7 +621,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion() { s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": s.Namespace(), "outcome": "error_not_found"}) // Send a valid - successful completion request. - completion, err = nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccesfulOptions{ + completion, err = nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) s.NoError(err) @@ -876,7 +876,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncFailure() { func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors() { ctx := testcore.NewContext() - completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccesfulOptions{ + completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) s.NoError(err) @@ -939,7 +939,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAuthErrors() { s.GetTestCluster().Host().SetOnAuthorize(onAuthorize) defer s.GetTestCluster().Host().SetOnAuthorize(nil) - completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccesfulOptions{ + completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) s.NoError(err) @@ -1377,7 +1377,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAfterReset() { } } s.True(seenStartedEvent) - completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccesfulOptions{ + completion, err := nexus.NewOperationCompletionSuccessful(s.mustToPayload("result"), nexus.OperationCompletionSuccessfulOptions{ Serializer: commonnexus.PayloadSerializer, }) s.NoError(err)