Skip to content

Commit

Permalink
broker/client: discard async appends of deleted journals
Browse files Browse the repository at this point in the history
There are unavoidable scenarios where a client creates an intention to
asynchronously write to a journal (as an AsyncAppend, or an
checkpointed ACK intent), but the journal is deleted before the write
can land.

Before, AppendService will loop forever awaiting the journal to come
back into existence.

After, AppendService will discard writes to deleted or non-existant
journals, logging a warning before doing so. This parallels the broker
behavior of dropping spooled fragment data upon journal deletion.
  • Loading branch information
jgraettinger committed Dec 12, 2023
1 parent 128bdd7 commit 3c45701
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
13 changes: 12 additions & 1 deletion broker/client/append_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
// writes block until successfully committed, as opposed to handling errors
// and retries themselves.
//
// AppendService will retry all errors except for context cancellation and
// ErrJournalNotFound. Appends to journals which don't exist are not treated
// as hard errors, as there are scenarios where clients cannot prevent a raced
// deletion of a journal. Instead, the append is discarded and callers can
// inspect its response Status.
//
// For each journal, AppendService manages an ordered list of AsyncAppends,
// each having buffered content to be appended. The list is dispatched in
// FIFO order by a journal-specific goroutine.
Expand Down Expand Up @@ -278,7 +284,8 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf }
// also roll back any writes queued by the caller, aborting the append
// transaction. Require is valid for use only until Release is called.
// Require returns itself, allowing uses like:
// Require(maybeErrors()).Release()
//
// Require(maybeErrors()).Release()
func (p *AsyncAppend) Require(err error) *AsyncAppend {
if err != nil && p.op.err == nil {
p.op.err = err
Expand Down Expand Up @@ -395,6 +402,10 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) {
if err2 == context.Canceled || err2 == context.DeadlineExceeded {
err = err2
return nil // Break retry loop.
} else if err2 == ErrJournalNotFound {
log.WithField("journal", aa.app.Request.Journal).
Warn("discarding append for journal that does not exist")
return nil // Success; break loop.
} else if err2 != nil {
aa.app.Reset()
return err2 // Retry by returning |err2|.
Expand Down
18 changes: 18 additions & 0 deletions broker/client/append_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) {
aaNext.mu.Unlock()

c.Check(as.PendingExcept(""), gc.HasLen, 0)

// Case: broker responds with JOURNAL_NOT_FOUND.
aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil)
_, _ = aa.Writer().WriteString("hello, world")
c.Assert(aa.Release(), gc.IsNil)

readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker.
broker.AppendRespCh <- buildNotFoundFixture(broker)

c.Check(aa.Err(), gc.IsNil)
c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND)
}

func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) {
Expand Down Expand Up @@ -611,4 +622,11 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen
}
}

func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse {
return pb.AppendResponse{
Status: pb.Status_JOURNAL_NOT_FOUND,
Header: *buildHeaderFixture(ep),
}
}

var _ = gc.Suite(&AppendServiceSuite{})
2 changes: 2 additions & 0 deletions broker/client/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (a *Appender) Close() (err error) {
switch a.Response.Status {
case pb.Status_OK:
// Pass.
case pb.Status_JOURNAL_NOT_FOUND:
err = ErrJournalNotFound
case pb.Status_NOT_JOURNAL_PRIMARY_BROKER:
err = ErrNotJournalPrimaryBroker
case pb.Status_WRONG_APPEND_OFFSET:
Expand Down
13 changes: 12 additions & 1 deletion broker/client/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"strings"
"time"

gc "gopkg.in/check.v1"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/broker/teststub"
gc "gopkg.in/check.v1"
)

type AppenderSuite struct{}
Expand Down Expand Up @@ -125,6 +125,17 @@ func (s *AppenderSuite) TestBrokerCommitError(c *gc.C) {
errRe: `validating broker response: Commit.Journal: invalid length .*`,
cachedRoute: 0,
},
// Case: known error status (journal not found).
{
finish: func() {
broker.AppendRespCh <- pb.AppendResponse{
Status: pb.Status_JOURNAL_NOT_FOUND,
Header: *buildHeaderFixture(broker),
}
},
errVal: ErrJournalNotFound,
cachedRoute: 1,
},
// Case: known error status (not primary broker).
{
finish: func() {
Expand Down

0 comments on commit 3c45701

Please sign in to comment.