diff --git a/broker/client/append_service.go b/broker/client/append_service.go index 642691bf..154af171 100644 --- a/broker/client/append_service.go +++ b/broker/client/append_service.go @@ -21,6 +21,12 @@ import ( // writes block until successfully committed, as opposed to handling errors // and retries themselves. // +// AppendService will retry all errors except for context cancellation and +// ErrJournalNotFound. Appends to journals which don't exist are not treated +// as hard errors, as there are scenarios where clients cannot prevent a raced +// deletion of a journal. Instead, the append is discarded and callers can +// inspect its response Status. +// // For each journal, AppendService manages an ordered list of AsyncAppends, // each having buffered content to be appended. The list is dispatched in // FIFO order by a journal-specific goroutine. @@ -278,7 +284,8 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf } // also roll back any writes queued by the caller, aborting the append // transaction. Require is valid for use only until Release is called. // Require returns itself, allowing uses like: -// Require(maybeErrors()).Release() +// +// Require(maybeErrors()).Release() func (p *AsyncAppend) Require(err error) *AsyncAppend { if err != nil && p.op.err == nil { p.op.err = err @@ -395,6 +402,10 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) { if err2 == context.Canceled || err2 == context.DeadlineExceeded { err = err2 return nil // Break retry loop. + } else if err2 == ErrJournalNotFound { + log.WithField("journal", aa.app.Request.Journal). + Warn("discarding append for journal that does not exist") + return nil // Success; break loop. } else if err2 != nil { aa.app.Reset() return err2 // Retry by returning |err2|. diff --git a/broker/client/append_service_test.go b/broker/client/append_service_test.go index e61a24c2..66d2526a 100644 --- a/broker/client/append_service_test.go +++ b/broker/client/append_service_test.go @@ -66,6 +66,17 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) { aaNext.mu.Unlock() c.Check(as.PendingExcept(""), gc.HasLen, 0) + + // Case: broker responds with JOURNAL_NOT_FOUND. + aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil) + _, _ = aa.Writer().WriteString("hello, world") + c.Assert(aa.Release(), gc.IsNil) + + readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker. + broker.AppendRespCh <- buildNotFoundFixture(broker) + + c.Check(aa.Err(), gc.IsNil) + c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND) } func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) { @@ -611,4 +622,11 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen } } +func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse { + return pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(ep), + } +} + var _ = gc.Suite(&AppendServiceSuite{}) diff --git a/broker/client/appender.go b/broker/client/appender.go index a9bc6b12..d3c153cf 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -96,6 +96,8 @@ func (a *Appender) Close() (err error) { switch a.Response.Status { case pb.Status_OK: // Pass. + case pb.Status_JOURNAL_NOT_FOUND: + err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_PRIMARY_BROKER: err = ErrNotJournalPrimaryBroker case pb.Status_WRONG_APPEND_OFFSET: diff --git a/broker/client/appender_test.go b/broker/client/appender_test.go index c5d69649..e0a990a2 100644 --- a/broker/client/appender_test.go +++ b/broker/client/appender_test.go @@ -7,9 +7,9 @@ import ( "strings" "time" - gc "gopkg.in/check.v1" pb "go.gazette.dev/core/broker/protocol" "go.gazette.dev/core/broker/teststub" + gc "gopkg.in/check.v1" ) type AppenderSuite struct{} @@ -125,6 +125,17 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) { errRe: `validating broker response: Commit.Journal: invalid length .*`, cachedRoute: 0, }, + // Case: known error status (journal not found). + { + finish: func() { + broker.AppendRespCh <- pb.AppendResponse{ + Status: pb.Status_JOURNAL_NOT_FOUND, + Header: *buildHeaderFixture(broker), + } + }, + errVal: ErrJournalNotFound, + cachedRoute: 1, + }, // Case: known error status (not primary broker). { finish: func() {