-
-
Notifications
You must be signed in to change notification settings - Fork 197
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #890 from twmb/885-test
kfake: add test for #885, add new APIs
- Loading branch information
Showing
7 changed files
with
317 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package kfake | ||
|
||
import ( | ||
"context" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/twmb/franz-go/pkg/kerr" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
"github.com/twmb/franz-go/pkg/kmsg" | ||
) | ||
|
||
func TestIssue885(t *testing.T) { | ||
const ( | ||
testTopic = "foo" | ||
producedMessages = 5 | ||
followerLogStart = 3 | ||
) | ||
|
||
c, err := NewCluster( | ||
NumBrokers(2), | ||
SleepOutOfOrder(), | ||
SeedTopics(1, testTopic), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer c.Close() | ||
|
||
// Flow: | ||
// | ||
// * We always say one broker is the leader -- every Metadata response | ||
// can be the same; we do not need to hijack Metadata | ||
// | ||
// * We produce 5 separate batches just to have some data | ||
// | ||
// * We hijack fetch: if to the leader, we say the other broker is the | ||
// follower. | ||
// | ||
// * We hijack fetch 2: if to follower, we say "offset out of range". | ||
// | ||
// END SETUP STAGE. | ||
// | ||
// TEST | ||
// | ||
// * We return one batch at a time from the leader. | ||
// * We expect the leader to receive 3 requests. | ||
// * On the fourth, we redirect back to the follower. | ||
// * Batch four and five are served from the follower. | ||
// * We are done. | ||
// * Any deviation is failure. | ||
// | ||
// We control the flow through the stages; any bug results in not continuing | ||
// forward (i.e. looping through the stages and never finishing). | ||
|
||
// Inline anonymous function so that we can defer and cleanup within scope. | ||
func() { | ||
cl, err := kgo.NewClient( | ||
kgo.DefaultProduceTopic(testTopic), | ||
kgo.SeedBrokers(c.ListenAddrs()...), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer cl.Close() | ||
|
||
for i := 0; i < producedMessages; i++ { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
err := cl.ProduceSync(ctx, kgo.StringRecord(strconv.Itoa(i))).FirstErr() | ||
cancel() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
}() | ||
|
||
var followerOOOR bool | ||
|
||
ti := c.TopicInfo(testTopic) | ||
pi := c.PartitionInfo(testTopic, 0) | ||
follower := (pi.Leader + 1) % 2 | ||
c.SetFollowers(testTopic, 0, []int32{follower}) | ||
|
||
c.ControlKey(1, func(kreq kmsg.Request) (kmsg.Response, error, bool) { | ||
c.KeepControl() | ||
|
||
req := kreq.(*kmsg.FetchRequest) | ||
if req.Version < 11 { | ||
t.Fatal("unable to run test with fetch requests < v11") | ||
} | ||
|
||
if len(req.Topics) != 1 || len(req.Topics[0].Partitions) != 1 { | ||
t.Fatalf("unexpected malformed req topics or partitions: %v", req) | ||
} | ||
|
||
// If we *do* return a batch, we want to ensure we return only | ||
// one batch. We modify the incoming request to ensure at most | ||
// one batch is returned. | ||
req.MaxBytes = 1 | ||
|
||
resp := req.ResponseKind().(*kmsg.FetchResponse) | ||
rt := kmsg.NewFetchResponseTopic() | ||
rt.Topic = testTopic | ||
rt.TopicID = ti.TopicID | ||
rp := kmsg.NewFetchResponseTopicPartition() | ||
|
||
resp.Topics = append(resp.Topics, rt) | ||
rtp := &resp.Topics[0] | ||
|
||
rtp.Partitions = append(rtp.Partitions, rp) | ||
rpp := &rtp.Partitions[0] | ||
|
||
rpp.Partition = 0 | ||
rpp.ErrorCode = 0 | ||
rpp.HighWatermark = pi.HighWatermark | ||
rpp.LastStableOffset = pi.LastStableOffset | ||
rpp.LogStartOffset = 0 | ||
|
||
if c.CurrentNode() == pi.Leader { | ||
if !followerOOOR || req.Topics[0].Partitions[0].FetchOffset >= followerLogStart { | ||
rpp.PreferredReadReplica = (pi.Leader + 1) % 2 | ||
return resp, nil, true | ||
} | ||
return nil, nil, false | ||
} | ||
|
||
if req.Topics[0].Partitions[0].FetchOffset < followerLogStart { | ||
rpp.ErrorCode = kerr.OffsetOutOfRange.Code | ||
rpp.LogStartOffset = 2 | ||
followerOOOR = true | ||
return resp, nil, true | ||
} | ||
|
||
return nil, nil, false | ||
}) | ||
|
||
cl, err := kgo.NewClient( | ||
kgo.SeedBrokers(c.ListenAddrs()...), | ||
kgo.ConsumeTopics(testTopic), | ||
kgo.Rack("foo"), | ||
kgo.DisableFetchSessions(), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer cl.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) | ||
defer cancel() | ||
|
||
for consumed := 0; consumed != producedMessages; { | ||
fs := cl.PollFetches(ctx) | ||
if errs := fs.Errors(); errs != nil { | ||
t.Errorf("consume error: %v", errs) | ||
break | ||
} | ||
consumed += fs.NumRecords() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters