From 38e54407ad2b1069cbbe52f893329a3133d28c41 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 12 Dec 2023 21:17:01 +0000 Subject: [PATCH 1/4] Revert "broker/client: discard async appends of deleted journals" This reverts commit 3c45701eebd56d12bf7065147390885b84c4aaac. --- broker/client/append_service.go | 13 +------------ broker/client/append_service_test.go | 18 ------------------ broker/client/appender.go | 2 -- broker/client/appender_test.go | 13 +------------ 4 files changed, 2 insertions(+), 44 deletions(-) diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 154af171..642691bf 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -21,12 +21,6 @@ import ( // writes block until successfully committed, as opposed to handling errors // and retries themselves. // -// AppendService will retry all errors except for context cancellation and -// ErrJournalNotFound. Appends to journals which don't exist are not treated -// as hard errors, as there are scenarios where clients cannot prevent a raced -// deletion of a journal. Instead, the append is discarded and callers can -// inspect its response Status. -// // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in // FIFO order by a journal-specific goroutine. @@ -284,8 +278,7 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf } // also roll back any writes queued by the caller, aborting the append // transaction. Require is valid for use only until Release is called. // Require returns itself, allowing uses like: -// -// Require(maybeErrors()).Release() +// Require(maybeErrors()).Release() func (p *AsyncAppend) Require(err error) *AsyncAppend { if err != nil && p.op.err == nil { p.op.err = err @@ -402,10 +395,6 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { if err2 == context.Canceled || err2 == context.DeadlineExceeded { err = err2 return nil // Break retry loop. - } else if err2 == ErrJournalNotFound { - log.WithField("journal", aa.app.Request.Journal). - Warn("discarding append for journal that does not exist") - return nil // Success; break loop. } else if err2 != nil { aa.app.Reset() return err2 // Retry by returning |err2|. diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index 66d2526a..e61a24c2 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -66,17 +66,6 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { aaNext.mu.Unlock() c.Check(as.PendingExcept(""), gc.HasLen, 0) - - // Case: broker responds with JOURNAL_NOT_FOUND. - aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) - _, _ = aa.Writer().WriteString("hello, world") - c.Assert(aa.Release(), gc.IsNil) - - readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. - broker.AppendRespCh <- buildNotFoundFixture(broker) - - c.Check(aa.Err(), gc.IsNil) - c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) { @@ -622,11 +611,4 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen } } -func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse { - return pb.AppendResponse{ - Status: pb.Status_JOURNAL_NOT_FOUND, - Header: *buildHeaderFixture(ep), - } -} - var _ = gc.Suite(&AppendServiceSuite{}) diff --git a/broker/client/appender.go b/broker/client/appender.go index d3c153cf..a9bc6b12 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -96,8 +96,6 @@ func (a *Appender) Close() (err error) { switch a.Response.Status { case pb.Status_OK: // Pass. - case pb.Status_JOURNAL_NOT_FOUND: - err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_PRIMARY_BROKER: err = ErrNotJournalPrimaryBroker case pb.Status_WRONG_APPEND_OFFSET: diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index e0a990a2..c5d69649 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -7,9 +7,9 @@ import ( "strings" "time" + gc "gopkg.in/check.v1" pb "go.gazette.dev/core/broker/protocol" "go.gazette.dev/core/broker/teststub" - gc "gopkg.in/check.v1" ) type AppenderSuite struct{} @@ -125,17 +125,6 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) { errRe: `validating broker response: Commit.Journal: invalid length .*`, cachedRoute: 0, }, - // Case: known error status (journal not found). - { - finish: func() { - broker.AppendRespCh <- pb.AppendResponse{ - Status: pb.Status_JOURNAL_NOT_FOUND, - Header: *buildHeaderFixture(broker), - } - }, - errVal: ErrJournalNotFound, - cachedRoute: 1, - }, // Case: known error status (not primary broker). { finish: func() { From 78bcf235a805197b09bdecc7f1de226b2dae50fc Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 12 Dec 2023 22:48:21 +0000 Subject: [PATCH 2/4] broker/client: make ErrJournalNotFound a terminal error Currently, AppendService will loop forever waiting for an appended journal to spring (back) into existence if it's not found. Instead, immediately fail the AsyncAppend and all of its dependents. This is desirable because journal deletions are often asynchronous and race deployed shards, and there's not much that can be done aside from failing and handling the error at a higher level. --- broker/client/append_service.go | 8 +++++--- broker/client/append_service_test.go | 18 ++++++++++++++++++ broker/client/appender.go | 2 ++ broker/client/appender_test.go | 13 ++++++++++++- 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 642691bf..66b03799 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -19,7 +19,8 @@ import ( // pipelined and batched to amortize the cost of broker Append RPCs. It may // also simplify implementations for clients who would prefer to simply have // writes block until successfully committed, as opposed to handling errors -// and retries themselves. +// and retries themselves. AppendService will retry all errors except for +// context cancellation and ErrJournalNotFound. // // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in @@ -278,7 +279,8 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf } // also roll back any writes queued by the caller, aborting the append // transaction. Require is valid for use only until Release is called. // Require returns itself, allowing uses like: -// Require(maybeErrors()).Release() +// +// Require(maybeErrors()).Release() func (p *AsyncAppend) Require(err error) *AsyncAppend { if err != nil && p.op.err == nil { p.op.err = err @@ -392,7 +394,7 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { err2 = aa.app.Close() } - if err2 == context.Canceled || err2 == context.DeadlineExceeded { + if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound { err = err2 return nil // Break retry loop. } else if err2 != nil { diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index e61a24c2..3b1271cb 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -66,6 +66,17 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { aaNext.mu.Unlock() c.Check(as.PendingExcept(""), gc.HasLen, 0) + + // Case: broker responds with a terminal JOURNAL_NOT_FOUND error. + aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) + _, _ = aa.Writer().WriteString("hello, world") + c.Assert(aa.Release(), gc.IsNil) + + readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. + broker.AppendRespCh <- buildNotFoundFixture(broker) + + c.Check(aa.Err(), gc.Equals, ErrJournalNotFound) + c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) { @@ -611,4 +622,11 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen } } +func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse { + return pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(ep), + } +} + var _ = gc.Suite(&AppendServiceSuite{}) diff --git a/broker/client/appender.go b/broker/client/appender.go index a9bc6b12..d3c153cf 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -96,6 +96,8 @@ func (a *Appender) Close() (err error) { switch a.Response.Status { case pb.Status_OK: // Pass. + case pb.Status_JOURNAL_NOT_FOUND: + err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_PRIMARY_BROKER: err = ErrNotJournalPrimaryBroker case pb.Status_WRONG_APPEND_OFFSET: diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index c5d69649..e0a990a2 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -7,9 +7,9 @@ import ( "strings" "time" - gc "gopkg.in/check.v1" pb "go.gazette.dev/core/broker/protocol" "go.gazette.dev/core/broker/teststub" + gc "gopkg.in/check.v1" ) type AppenderSuite struct{} @@ -125,6 +125,17 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) { errRe: `validating broker response: Commit.Journal: invalid length .*`, cachedRoute: 0, }, + // Case: known error status (journal not found). + { + finish: func() { + broker.AppendRespCh <- pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(broker), + } + }, + errVal: ErrJournalNotFound, + cachedRoute: 1, + }, // Case: known error status (not primary broker). { finish: func() { From 5c58f6d5c80b8396af6c42236b88ac3a89cf755a Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 12 Dec 2023 23:53:09 +0000 Subject: [PATCH 3/4] broker/client: Appender retries on insufficient journal brokers This happens when a journal is new and the topology is converging. It shouldn't result in a terminal Append failure. --- broker/client/appender.go | 2 +- broker/client/appender_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/client/appender.go b/broker/client/appender.go index d3c153cf..1423e81b 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -180,7 +180,7 @@ func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendReques return a.Response, nil } else if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { // Fallthrough to retry - } else if err == ErrNotJournalPrimaryBroker { + } else if err == ErrNotJournalPrimaryBroker || err == ErrInsufficientJournalBrokers { // Fallthrough. } else { return a.Response, err diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index e0a990a2..87ef040d 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -222,7 +222,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) { {status: pb.Status_NOT_JOURNAL_PRIMARY_BROKER}, {status: pb.Status_OK}, // Case 2: Unexpected status is surfaced. - {status: pb.Status_INSUFFICIENT_JOURNAL_BROKERS}, + {status: pb.Status_WRONG_APPEND_OFFSET}, // Case 3: As are errors. {err: errors.New("an error")}, } @@ -273,7 +273,7 @@ func (s *AppenderSuite) TestAppendCases(c *gc.C) { // Case 2: Unexpected status is surfaced. _, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent) - c.Check(err, gc.ErrorMatches, "INSUFFICIENT_JOURNAL_BROKERS") + c.Check(err, gc.ErrorMatches, "WRONG_APPEND_OFFSET") // Case 3: As are errors. _, err = Append(ctx, rjc, pb.AppendRequest{Journal: "a/journal"}, con, tent) From 71f54d0ff4d5b10a025b5e22f8d874da185c0d49 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Wed, 13 Dec 2023 00:01:51 +0000 Subject: [PATCH 4/4] consumer: when writing recovered ACKs, allow for missing journals It's not uncommon for recovered ACKs to contain journals which have since been deleted. When this happens, log a warning and otherwise discard the ACK intent. Do this only for recovered intents: writen ACKs of the current session must still exist. --- consumer/transaction.go | 22 +++++++++++++++++++--- consumer/transaction_test.go | 2 ++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/consumer/transaction.go b/consumer/transaction.go index 202389e9..5d2ab8b6 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -1,6 +1,7 @@ package consumer import ( + "bytes" "fmt" "io" "runtime/trace" @@ -43,9 +44,24 @@ func runTransactions(s *shard, cp pc.Checkpoint, readCh <-chan EnvelopeOrError, <-realTimer.C // Timer starts as idle. // Begin by acknowledging (or re-acknowledging) messages published as part - // of the most-recent recovered transaction checkpoint. - if err := txnAcknowledge(s, &prev, cp); err != nil { - return fmt.Errorf("txnAcknowledge(recovered Checkpoint): %w", err) + // of the most-recent recovered transaction checkpoint. This is a relaxed + // form of txnAcknowledge(), as we allow recovered intents to name journals + // that don't actually exist (presumably they were deleted in the meantime). + for journal, ack := range cp.AckIntents { + var op = client.NewAsyncOperation() + prev.acks[op] = struct{}{} + + go func(req pb.AppendRequest, r *bytes.Reader) (_err error) { + defer func() { op.Resolve(_err) }() + + if _, err := client.Append(s.ctx, s.ajc, req, r); err == client.ErrJournalNotFound { + log.WithField("journal", req.Journal). + Warn("discarding recovered ACK-intent of non-existent journal") + } else if err != nil { + return fmt.Errorf("writing recovered ACK intent for journal %s: %w", req.Journal, err) + } + return nil + }(pb.AppendRequest{Journal: journal}, bytes.NewReader(ack)) } for { diff --git a/consumer/transaction_test.go b/consumer/transaction_test.go index be5e2556..13823abb 100644 --- a/consumer/transaction_test.go +++ b/consumer/transaction_test.go @@ -651,6 +651,8 @@ func TestRunTxnsACKsRecoveredCheckpoint(t *testing.T) { var cp = playAndComplete(t, shard) cp.AckIntents = map[pb.Journal][]byte{ echoOut.Name: []byte(`{"Key": "recovered fixture"}` + "\n"), + // Recovered ACK intents may included journals which do not exist. + "does/not/exist": []byte(`{"Key": "discarded fixture"}` + "\n"), } // Use a read channel fixture which immediately closes.