From 3c45701eebd56d12bf7065147390885b84c4aaac Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 12 Dec 2023 20:25:36 +0000 Subject: [PATCH] broker/client: discard async appends of deleted journals There are unavoidable scenarios where a client creates an intention to asynchronously write to a journal (as an AsyncAppend, or an checkpointed ACK intent), but the journal is deleted before the write can land. Before, AppendService will loop forever awaiting the journal to come back into existence. After, AppendService will discard writes to deleted or non-existant journals, logging a warning before doing so. This parallels the broker behavior of dropping spooled fragment data upon journal deletion. --- broker/client/append_service.go | 13 ++++++++++++- broker/client/append_service_test.go | 18 ++++++++++++++++++ broker/client/appender.go | 2 ++ broker/client/appender_test.go | 13 ++++++++++++- 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 642691bf..154af171 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -21,6 +21,12 @@ import ( // writes block until successfully committed, as opposed to handling errors // and retries themselves. // +// AppendService will retry all errors except for context cancellation and +// ErrJournalNotFound. Appends to journals which don't exist are not treated +// as hard errors, as there are scenarios where clients cannot prevent a raced +// deletion of a journal. Instead, the append is discarded and callers can +// inspect its response Status. +// // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in // FIFO order by a journal-specific goroutine. @@ -278,7 +284,8 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf } // also roll back any writes queued by the caller, aborting the append // transaction. Require is valid for use only until Release is called. // Require returns itself, allowing uses like: -// Require(maybeErrors()).Release() +// +// Require(maybeErrors()).Release() func (p *AsyncAppend) Require(err error) *AsyncAppend { if err != nil && p.op.err == nil { p.op.err = err @@ -395,6 +402,10 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { if err2 == context.Canceled || err2 == context.DeadlineExceeded { err = err2 return nil // Break retry loop. + } else if err2 == ErrJournalNotFound { + log.WithField("journal", aa.app.Request.Journal). + Warn("discarding append for journal that does not exist") + return nil // Success; break loop. } else if err2 != nil { aa.app.Reset() return err2 // Retry by returning |err2|. diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index e61a24c2..66d2526a 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -66,6 +66,17 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { aaNext.mu.Unlock() c.Check(as.PendingExcept(""), gc.HasLen, 0) + + // Case: broker responds with JOURNAL_NOT_FOUND. + aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) + _, _ = aa.Writer().WriteString("hello, world") + c.Assert(aa.Release(), gc.IsNil) + + readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. + broker.AppendRespCh <- buildNotFoundFixture(broker) + + c.Check(aa.Err(), gc.IsNil) + c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) { @@ -611,4 +622,11 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen } } +func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse { + return pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(ep), + } +} + var _ = gc.Suite(&AppendServiceSuite{}) diff --git a/broker/client/appender.go b/broker/client/appender.go index a9bc6b12..d3c153cf 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -96,6 +96,8 @@ func (a *Appender) Close() (err error) { switch a.Response.Status { case pb.Status_OK: // Pass. + case pb.Status_JOURNAL_NOT_FOUND: + err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_PRIMARY_BROKER: err = ErrNotJournalPrimaryBroker case pb.Status_WRONG_APPEND_OFFSET: diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index c5d69649..e0a990a2 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -7,9 +7,9 @@ import ( "strings" "time" - gc "gopkg.in/check.v1" pb "go.gazette.dev/core/broker/protocol" "go.gazette.dev/core/broker/teststub" + gc "gopkg.in/check.v1" ) type AppenderSuite struct{} @@ -125,6 +125,17 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) { errRe: `validating broker response: Commit.Journal: invalid length .*`, cachedRoute: 0, }, + // Case: known error status (journal not found). + { + finish: func() { + broker.AppendRespCh <- pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(broker), + } + }, + errVal: ErrJournalNotFound, + cachedRoute: 1, + }, // Case: known error status (not primary broker). { finish: func() {