diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 154af171..66b03799 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -19,13 +19,8 @@ import ( // pipelined and batched to amortize the cost of broker Append RPCs. It may // also simplify implementations for clients who would prefer to simply have // writes block until successfully committed, as opposed to handling errors -// and retries themselves. -// -// AppendService will retry all errors except for context cancellation and -// ErrJournalNotFound. Appends to journals which don't exist are not treated -// as hard errors, as there are scenarios where clients cannot prevent a raced -// deletion of a journal. Instead, the append is discarded and callers can -// inspect its response Status. +// and retries themselves. AppendService will retry all errors except for +// context cancellation and ErrJournalNotFound. // // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in @@ -399,13 +394,9 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { err2 = aa.app.Close() } - if err2 == context.Canceled || err2 == context.DeadlineExceeded { + if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound { err = err2 return nil // Break retry loop. - } else if err2 == ErrJournalNotFound { - log.WithField("journal", aa.app.Request.Journal). - Warn("discarding append for journal that does not exist") - return nil // Success; break loop. } else if err2 != nil { aa.app.Reset() return err2 // Retry by returning |err2|. diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index 66d2526a..3b1271cb 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -67,7 +67,7 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { c.Check(as.PendingExcept(""), gc.HasLen, 0) - // Case: broker responds with JOURNAL_NOT_FOUND. + // Case: broker responds with a terminal JOURNAL_NOT_FOUND error. aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) _, _ = aa.Writer().WriteString("hello, world") c.Assert(aa.Release(), gc.IsNil) @@ -75,7 +75,7 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. broker.AppendRespCh <- buildNotFoundFixture(broker) - c.Check(aa.Err(), gc.IsNil) + c.Check(aa.Err(), gc.Equals, ErrJournalNotFound) c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } diff --git a/broker/client/appender.go b/broker/client/appender.go index d3c153cf..1423e81b 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -180,7 +180,7 @@ func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendReques return a.Response, nil } else if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { // Fallthrough to retry - } else if err == ErrNotJournalPrimaryBroker { + } else if err == ErrNotJournalPrimaryBroker || err == ErrInsufficientJournalBrokers { // Fallthrough. } else { return a.Response, err diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index e0a990a2..87ef040d 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -222,7 +222,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) { {status: pb.Status_NOT_JOURNAL_PRIMARY_BROKER}, {status: pb.Status_OK}, // Case 2: Unexpected status is surfaced. - {status: pb.Status_INSUFFICIENT_JOURNAL_BROKERS}, + {status: pb.Status_WRONG_APPEND_OFFSET}, // Case 3: As are errors. {err: errors.New("an error")}, } @@ -273,7 +273,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) { // Case 2: Unexpected status is surfaced. _, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent) - c.Check(err, gc.ErrorMatches, "INSUFFICIENT_JOURNAL_BROKERS") + c.Check(err, gc.ErrorMatches, "WRONG_APPEND_OFFSET") // Case 3: As are errors. _, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent) diff --git a/consumer/transaction.go b/consumer/transaction.go index 202389e9..5d2ab8b6 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -1,6 +1,7 @@ package consumer import ( + "bytes" "fmt" "io" "runtime/trace" @@ -43,9 +44,24 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan EnvelopeOrError, <-realTimer.C // Timer starts as idle. // Begin by acknowledging (or re-acknowledging) messages published as part - // of the most-recent recovered transaction checkpoint. - if err := txnAcknowledge(s, &prev, cp); err != nil { - return fmt.Errorf("txnAcknowledge(recovered Checkpoint): %w", err) + // of the most-recent recovered transaction checkpoint. This is a relaxed + // form of txnAcknowledge(), as we allow recovered intents to name journals + // that don't actually exist (presumably they were deleted in the meantime). + for journal, ack := range cp.AckIntents { + var op = client.NewAsyncOperation() + prev.acks[op] = struct{}{} + + go func(req pb.AppendRequest, r *bytes.Reader) (_err error) { + defer func() { op.Resolve(_err) }() + + if _, err := client.Append(s.ctx, s.ajc, req, r); err == client.ErrJournalNotFound { + log.WithField("journal", req.Journal). + Warn("discarding recovered ACK-intent of non-existent journal") + } else if err != nil { + return fmt.Errorf("writing recovered ACK intent for journal %s: %w", req.Journal, err) + } + return nil + }(pb.AppendRequest{Journal: journal}, bytes.NewReader(ack)) } for { diff --git a/consumer/transaction_test.go b/consumer/transaction_test.go index be5e2556..13823abb 100644 --- a/consumer/transaction_test.go +++ b/consumer/transaction_test.go @@ -651,6 +651,8 @@ func TestRunTxnsACKsRecoveredCheckpoint(t *testing.T) { var cp = playAndComplete(t, shard) cp.AckIntents = map[pb.Journal][]byte{ echoOut.Name: []byte(`{"Key": "recovered fixture"}` + "\n"), + // Recovered ACK intents may included journals which do not exist. + "does/not/exist": []byte(`{"Key": "discarded fixture"}` + "\n"), } // Use a read channel fixture which immediately closes.