Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take two: relax ErrJournalNotFound only when writing recovered ACKs #358

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions broker/client/append_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@ import (
// pipelined and batched to amortize the cost of broker Append RPCs. It may
// also simplify implementations for clients who would prefer to simply have
// writes block until successfully committed, as opposed to handling errors
// and retries themselves.
//
// AppendService will retry all errors except for context cancellation and
// ErrJournalNotFound. Appends to journals which don't exist are not treated
// as hard errors, as there are scenarios where clients cannot prevent a raced
// deletion of a journal. Instead, the append is discarded and callers can
// inspect its response Status.
// and retries themselves. AppendService will retry all errors except for
// context cancellation and ErrJournalNotFound.
//
// For each journal, AppendService manages an ordered list of AsyncAppends,
// each having buffered content to be appended. The list is dispatched in
Expand Down Expand Up @@ -399,13 +394,9 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) {
err2 = aa.app.Close()
}

if err2 == context.Canceled || err2 == context.DeadlineExceeded {
if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound {
err = err2
return nil // Break retry loop.
} else if err2 == ErrJournalNotFound {
log.WithField("journal", aa.app.Request.Journal).
Warn("discarding append for journal that does not exist")
return nil // Success; break loop.
} else if err2 != nil {
aa.app.Reset()
return err2 // Retry by returning |err2|.
Expand Down
4 changes: 2 additions & 2 deletions broker/client/append_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) {

c.Check(as.PendingExcept(""), gc.HasLen, 0)

// Case: broker responds with JOURNAL_NOT_FOUND.
// Case: broker responds with a terminal JOURNAL_NOT_FOUND error.
aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil)
_, _ = aa.Writer().WriteString("hello, world")
c.Assert(aa.Release(), gc.IsNil)

readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker.
broker.AppendRespCh <- buildNotFoundFixture(broker)

c.Check(aa.Err(), gc.IsNil)
c.Check(aa.Err(), gc.Equals, ErrJournalNotFound)
c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND)
}

Expand Down
2 changes: 1 addition & 1 deletion broker/client/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendReques
return a.Response, nil
} else if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
// Fallthrough to retry
} else if err == ErrNotJournalPrimaryBroker {
} else if err == ErrNotJournalPrimaryBroker || err == ErrInsufficientJournalBrokers {
// Fallthrough.
} else {
return a.Response, err
Expand Down
4 changes: 2 additions & 2 deletions broker/client/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) {
{status: pb.Status_NOT_JOURNAL_PRIMARY_BROKER},
{status: pb.Status_OK},
// Case 2: Unexpected status is surfaced.
{status: pb.Status_INSUFFICIENT_JOURNAL_BROKERS},
{status: pb.Status_WRONG_APPEND_OFFSET},
// Case 3: As are errors.
{err: errors.New("an error")},
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) {

// Case 2: Unexpected status is surfaced.
_, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent)
c.Check(err, gc.ErrorMatches, "INSUFFICIENT_JOURNAL_BROKERS")
c.Check(err, gc.ErrorMatches, "WRONG_APPEND_OFFSET")

// Case 3: As are errors.
_, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent)
Expand Down
22 changes: 19 additions & 3 deletions consumer/transaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumer

import (
"bytes"
"fmt"
"io"
"runtime/trace"
Expand Down Expand Up @@ -43,9 +44,24 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan EnvelopeOrError,
<-realTimer.C // Timer starts as idle.

// Begin by acknowledging (or re-acknowledging) messages published as part
// of the most-recent recovered transaction checkpoint.
if err := txnAcknowledge(s, &prev, cp); err != nil {
return fmt.Errorf("txnAcknowledge(recovered Checkpoint): %w", err)
// of the most-recent recovered transaction checkpoint. This is a relaxed
// form of txnAcknowledge(), as we allow recovered intents to name journals
// that don't actually exist (presumably they were deleted in the meantime).
for journal, ack := range cp.AckIntents {
var op = client.NewAsyncOperation()
prev.acks[op] = struct{}{}

go func(req pb.AppendRequest, r *bytes.Reader) (_err error) {
defer func() { op.Resolve(_err) }()

if _, err := client.Append(s.ctx, s.ajc, req, r); err == client.ErrJournalNotFound {
log.WithField("journal", req.Journal).
Warn("discarding recovered ACK-intent of non-existent journal")
} else if err != nil {
return fmt.Errorf("writing recovered ACK intent for journal %s: %w", req.Journal, err)
}
return nil
}(pb.AppendRequest{Journal: journal}, bytes.NewReader(ack))
}

for {
Expand Down
2 changes: 2 additions & 0 deletions consumer/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ func TestRunTxnsACKsRecoveredCheckpoint(t *testing.T) {
var cp = playAndComplete(t, shard)
cp.AckIntents = map[pb.Journal][]byte{
echoOut.Name: []byte(`{"Key": "recovered fixture"}` + "\n"),
// Recovered ACK intents may included journals which do not exist.
"does/not/exist": []byte(`{"Key": "discarded fixture"}` + "\n"),
}

// Use a read channel fixture which immediately closes.
Expand Down
Loading