Skip to content

Commit

Permalink
broker/consumer: add CreateRevision to ListResponse
Browse files Browse the repository at this point in the history
CreateRevision is the revision at which the JournalSpec or ShardSpec was
created within Etcd, and pairs with the ModRevision which is already
surfaced in the API response.
  • Loading branch information
jgraettinger committed Jan 30, 2024
1 parent e2cace3 commit 58d1084
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 289 deletions.
3 changes: 2 additions & 1 deletion broker/list_apply_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"

log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/allocator"
pb "go.gazette.dev/core/broker/protocol"
pbx "go.gazette.dev/core/broker/protocol/ext"
Expand Down Expand Up @@ -61,6 +61,7 @@ func (svc *Service) List(ctx context.Context, req *pb.ListRequest) (resp *pb.Lis
continue
}
journal.ModRevision = s.Items[cur.Left].Raw.ModRevision
journal.CreateRevision = s.Items[cur.Left].Raw.CreateRevision
pbx.Init(&journal.Route, s.Assignments[cur.RightBegin:cur.RightEnd])
pbx.AttachEndpoints(&journal.Route, s.KS)

Expand Down
5 changes: 5 additions & 0 deletions broker/list_apply_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestListCases(t *testing.T) {
}

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})
var fixtureRevision int64
{
var resp, err = broker.client().Apply(ctx, &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
Expand All @@ -50,6 +51,7 @@ func TestListCases(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, pb.Status_OK, resp.Status)
fixtureRevision = resp.Header.Etcd.Revision
}

// Assign |specC| to |bk|, to verify its returned non-empty Route.
Expand All @@ -72,6 +74,9 @@ func TestListCases(t *testing.T) {
for i, exp := range expect {
require.Equal(t, *exp, resp.Journals[i].Spec)

require.Equal(t, fixtureRevision, resp.Journals[i].CreateRevision)
require.Equal(t, fixtureRevision, resp.Journals[i].ModRevision)

if exp == specC {
require.Equal(t, []pb.ProcessSpec_ID{{Zone: "local", Suffix: "broker"}},
resp.Journals[i].Route.Members)
Expand Down
360 changes: 195 additions & 165 deletions broker/protocol/protocol.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion broker/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,12 @@ message ListResponse {
// Journals of the response.
message Journal {
JournalSpec spec = 1 [ (gogoproto.nullable) = false ];
// Current ModRevision of the JournalSpec.
// Current Etcd ModRevision of the JournalSpec.
int64 mod_revision = 2;
// Route of the journal, including endpoints.
Route route = 3 [ (gogoproto.nullable) = false ];
// Etcd CreateRevision of the JournalSpec.
int64 create_revision = 4;
}
repeated Journal journals = 3 [ (gogoproto.nullable) = false ];
}
Expand Down
272 changes: 151 additions & 121 deletions consumer/protocol/protocol.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion consumer/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ message ListResponse {
// Shards of the response.
message Shard {
ShardSpec spec = 1 [ (gogoproto.nullable) = false ];
// Current ModRevision of the ShardSpec.
// Current Etcd ModRevision of the ShardSpec.
int64 mod_revision = 2;
// Route of the shard, including endpoints.
protocol.Route route = 3 [ (gogoproto.nullable) = false ];
// Status of each replica. Cardinality and ordering matches |route|.
repeated ReplicaStatus status = 4 [ (gogoproto.nullable) = false ];
// Etcd CreateRevision of the JournalSpec.
int64 create_revision = 5;
}
repeated Shard shards = 3 [ (gogoproto.nullable) = false ];
// Optional extension of the ListResponse.
Expand Down
1 change: 1 addition & 0 deletions consumer/shard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func ShardList(ctx context.Context, srv *Service, req *pc.ListRequest) (*pc.List
continue
}
shard.ModRevision = s.Items[cur.Left].Raw.ModRevision
shard.CreateRevision = s.Items[cur.Left].Raw.CreateRevision
pbx.Init(&shard.Route, s.Assignments[cur.RightBegin:cur.RightEnd])
pbx.AttachEndpoints(&shard.Route, s.KS)

Expand Down
3 changes: 3 additions & 0 deletions consumer/shard_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func TestAPIListCases(t *testing.T) {
for i, exp := range expect {
require.Equal(t, *exp, resp.Shards[i].Spec)

require.NotZero(t, resp.Shards[i].ModRevision)
require.NotZero(t, resp.Shards[i].CreateRevision)

var numAsn int
if exp == specB {
numAsn = 1
Expand Down

0 comments on commit 58d1084

Please sign in to comment.