Skip to content

Commit

Permalink
fix flucky cluster recover tests due to PUB/SUB buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 1, 2020
1 parent bc442d3 commit b1ac02d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 38 deletions.
36 changes: 22 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,27 @@ type subscribeContext struct {
channelContext channelContext
}

func isRecovered(historyResult HistoryResult, cmdOffset uint64, cmdEpoch string) ([]*protocol.Publication, bool) {
latestOffset := historyResult.Offset
latestEpoch := historyResult.Epoch

recoveredPubs := make([]*protocol.Publication, 0, len(historyResult.Publications))
for _, pub := range historyResult.Publications {
protoPub := pubToProto(pub)
recoveredPubs = append(recoveredPubs, protoPub)
}

nextOffset := cmdOffset + 1
var recovered bool
if len(recoveredPubs) == 0 {
recovered = latestOffset == cmdOffset && latestEpoch == cmdEpoch
} else {
recovered = recoveredPubs[0].Offset == nextOffset && latestEpoch == cmdEpoch
}

return recoveredPubs, recovered
}

// subscribeCmd handles subscribe command - clients send this when subscribe
// on channel, if channel if private then we must validate provided sign here before
// actually subscribe client on channel. Optionally we can send missed messages to
Expand Down Expand Up @@ -1736,23 +1757,10 @@ func (c *Client) subscribeCmd(cmd *protocol.SubscribeRequest, rw *replyWriter, s
ctx.disconnect = DisconnectServerError
return ctx
}

latestOffset = historyResult.Offset
latestEpoch = historyResult.Epoch

recoveredPubs = make([]*protocol.Publication, 0, len(historyResult.Publications))
for _, pub := range historyResult.Publications {
protoPub := pubToProto(pub)
recoveredPubs = append(recoveredPubs, protoPub)
}

nextOffset := cmdOffset + 1
var recovered bool
if len(recoveredPubs) == 0 {
recovered = latestOffset == cmdOffset && latestEpoch == cmd.Epoch
} else {
recovered = recoveredPubs[0].Offset == nextOffset && latestEpoch == cmd.Epoch
}
recoveredPubs, recovered = isRecovered(historyResult, cmdOffset, cmd.Epoch)
res.Recovered = recovered
incRecover(res.Recovered)
} else {
Expand Down
30 changes: 6 additions & 24 deletions engine_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ func nodeWithRedisEngine(tb testing.TB, useStreams bool, useCluster bool) *Node

func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bool, useCluster bool) {
node := nodeWithRedisEngine(t, useStreams, useCluster)
defer node.Shutdown(context.Background())

node.config.ChannelOptionsFunc = func(channel string) (ChannelOptions, bool, error) {
return ChannelOptions{
Expand All @@ -1015,11 +1016,6 @@ func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bo
}, true, nil
}

transport := newTestTransport()
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, _ := newClient(newCtx, node, transport)

channel := "test_recovery_redis_" + tt.Name

for i := 1; i <= tt.NumPublications; i++ {
Expand All @@ -1029,31 +1025,17 @@ func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bo

time.Sleep(time.Duration(tt.Sleep) * time.Second)

connectClient(t, client)

var replies []*protocol.Reply
rw := testReplyWriter(&replies)

_, streamTop, err := node.broker.History(channel, HistoryFilter{
Limit: 0,
Since: nil,
})
require.NoError(t, err)

subCtx := client.subscribeCmd(&protocol.SubscribeRequest{
Channel: channel,
Recover: true,
Offset: tt.SinceOffset,
Epoch: streamTop.Epoch,
}, rw, false)
require.Nil(t, subCtx.disconnect)
require.NotEmpty(t, replies)
require.Nil(t, replies[0].Error)
res := extractSubscribeResult(replies, client.Transport().Protocol())
require.Equal(t, tt.NumRecovered, len(res.Publications))
require.Equal(t, tt.Recovered, res.Recovered)

node.Shutdown(context.Background())
historyResult, err := node.recoverHistory(channel, StreamPosition{tt.SinceOffset, streamTop.Epoch})
require.NoError(t, err)
recoveredPubs, recovered := isRecovered(historyResult, tt.SinceOffset, streamTop.Epoch)
require.Equal(t, tt.NumRecovered, len(recoveredPubs))
require.Equal(t, tt.Recovered, recovered)
}

func TestRedisClientSubscribeRecoverStreams(t *testing.T) {
Expand Down

0 comments on commit b1ac02d

Please sign in to comment.