-
-
Notifications
You must be signed in to change notification settings - Fork 197
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kfake: add test for #885, add new APIs
Adds APIs: ``` func (c *Cluster) LeaderFor(topic string, partition int32) int32 func (c *Cluster) SetFollowers(topic string, partition int32, followers []int32) func (c *Cluster) TopicInfo(topic string) *TopicInfo func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo ``` as well as two info structs. These APIs and structs are used in a new test to exercise an edge condition of follower fetching. Todo followup is to move the test to a different directory, but that can wait -- the "patten" established here is to write the test with a local replace directive, ensure it works, merge the fix to franz-go, then bump the dep here and actually push the test. Not the best pattern, but also one that requires about two minutes of time right now.
- Loading branch information
Showing
7 changed files
with
317 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package kfake | ||
|
||
import ( | ||
"context" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/twmb/franz-go/pkg/kerr" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
"github.com/twmb/franz-go/pkg/kmsg" | ||
) | ||
|
||
func TestIssue885(t *testing.T) { | ||
const ( | ||
testTopic = "foo" | ||
producedMessages = 5 | ||
followerLogStart = 3 | ||
) | ||
|
||
c, err := NewCluster( | ||
NumBrokers(2), | ||
SleepOutOfOrder(), | ||
SeedTopics(1, testTopic), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer c.Close() | ||
|
||
// Flow: | ||
// | ||
// * We always say one broker is the leader -- every Metadata response | ||
// can be the same; we do not need to hijack Metadata | ||
// | ||
// * We produce 5 separate batches just to have some data | ||
// | ||
// * We hijack fetch: if to the leader, we say the other broker is the | ||
// follower. | ||
// | ||
// * We hijack fetch 2: if to follower, we say "offset out of range". | ||
// | ||
// END SETUP STAGE. | ||
// | ||
// TEST | ||
// | ||
// * We return one batch at a time from the leader. | ||
// * We expect the leader to receive 3 requests. | ||
// * On the fourth, we redirect back to the follower. | ||
// * Batch four and five are served from the follower. | ||
// * We are done. | ||
// * Any deviation is failure. | ||
// | ||
// We control the flow through the stages; any bug results in not continuing | ||
// forward (i.e. looping through the stages and never finishing). | ||
|
||
// Inline anonymous function so that we can defer and cleanup within scope. | ||
func() { | ||
cl, err := kgo.NewClient( | ||
kgo.DefaultProduceTopic(testTopic), | ||
kgo.SeedBrokers(c.ListenAddrs()...), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer cl.Close() | ||
|
||
for i := 0; i < producedMessages; i++ { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
err := cl.ProduceSync(ctx, kgo.StringRecord(strconv.Itoa(i))).FirstErr() | ||
cancel() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
}() | ||
|
||
var followerOOOR bool | ||
|
||
ti := c.TopicInfo(testTopic) | ||
pi := c.PartitionInfo(testTopic, 0) | ||
follower := (pi.Leader + 1) % 2 | ||
c.SetFollowers(testTopic, 0, []int32{follower}) | ||
|
||
c.ControlKey(1, func(kreq kmsg.Request) (kmsg.Response, error, bool) { | ||
c.KeepControl() | ||
|
||
req := kreq.(*kmsg.FetchRequest) | ||
if req.Version < 11 { | ||
t.Fatal("unable to run test with fetch requests < v11") | ||
} | ||
|
||
if len(req.Topics) != 1 || len(req.Topics[0].Partitions) != 1 { | ||
t.Fatalf("unexpected malformed req topics or partitions: %v", req) | ||
} | ||
|
||
// If we *do* return a batch, we want to ensure we return only | ||
// one batch. We modify the incoming request to ensure at most | ||
// one batch is returned. | ||
req.MaxBytes = 1 | ||
|
||
resp := req.ResponseKind().(*kmsg.FetchResponse) | ||
rt := kmsg.NewFetchResponseTopic() | ||
rt.Topic = testTopic | ||
rt.TopicID = ti.TopicID | ||
rp := kmsg.NewFetchResponseTopicPartition() | ||
|
||
resp.Topics = append(resp.Topics, rt) | ||
rtp := &resp.Topics[0] | ||
|
||
rtp.Partitions = append(rtp.Partitions, rp) | ||
rpp := &rtp.Partitions[0] | ||
|
||
rpp.Partition = 0 | ||
rpp.ErrorCode = 0 | ||
rpp.HighWatermark = pi.HighWatermark | ||
rpp.LastStableOffset = pi.LastStableOffset | ||
rpp.LogStartOffset = 0 | ||
|
||
if c.CurrentNode() == pi.Leader { | ||
if !followerOOOR || req.Topics[0].Partitions[0].FetchOffset >= followerLogStart { | ||
rpp.PreferredReadReplica = (pi.Leader + 1) % 2 | ||
return resp, nil, true | ||
} | ||
return nil, nil, false | ||
} | ||
|
||
if req.Topics[0].Partitions[0].FetchOffset < followerLogStart { | ||
rpp.ErrorCode = kerr.OffsetOutOfRange.Code | ||
rpp.LogStartOffset = 2 | ||
followerOOOR = true | ||
return resp, nil, true | ||
} | ||
|
||
return nil, nil, false | ||
}) | ||
|
||
cl, err := kgo.NewClient( | ||
kgo.SeedBrokers(c.ListenAddrs()...), | ||
kgo.ConsumeTopics(testTopic), | ||
kgo.Rack("foo"), | ||
kgo.DisableFetchSessions(), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer cl.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) | ||
defer cancel() | ||
|
||
for consumed := 0; consumed != producedMessages; { | ||
fs := cl.PollFetches(ctx) | ||
if errs := fs.Errors(); errs != nil { | ||
t.Errorf("consume error: %v", errs) | ||
break | ||
} | ||
consumed += fs.NumRecords() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters