forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
balance_strategy.go
1141 lines (1009 loc) · 44 KB
/
balance_strategy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package sarama
import (
"container/heap"
"errors"
"fmt"
"math"
"sort"
"strings"
)
const (
// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
RangeBalanceStrategyName = "range"
// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
RoundRobinBalanceStrategyName = "roundrobin"
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"
defaultGeneration = -1
)
// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
// It contains an allocation of topic/partitions by memberID in the form of
// a `memberID -> topic -> partitions` map.
type BalanceStrategyPlan map[string]map[string][]int32
// Add assigns a topic with a number partitions to a member.
func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
if len(partitions) == 0 {
return
}
if _, ok := p[memberID]; !ok {
p[memberID] = make(map[string][]int32, 1)
}
p[memberID][topic] = append(p[memberID][topic], partitions...)
}
// --------------------------------------------------------------------
// BalanceStrategy is used to balance topics and partitions
// across members of a consumer group
type BalanceStrategy interface {
// Name uniquely identifies the strategy.
Name() string
// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
// and returns a distribution plan.
Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
// AssignmentData returns the serialized assignment data for the specified
// memberID
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}
// --------------------------------------------------------------------
// NewBalanceStrategyRange returns a range balance strategy,
// which is the default and assigns partitions as ranges to consumer group members.
// This follows the same logic as
// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
//
// Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
//
// M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
// M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
func NewBalanceStrategyRange() BalanceStrategy {
return &balanceStrategy{
name: RangeBalanceStrategyName,
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
partitionsPerConsumer := len(partitions) / len(memberIDs)
consumersWithExtraPartition := len(partitions) % len(memberIDs)
sort.Strings(memberIDs)
for i, memberID := range memberIDs {
min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
extra := 0
if i < consumersWithExtraPartition {
extra = 1
}
max := min + partitionsPerConsumer + extra
plan.Add(memberID, topic, partitions[min:max]...)
}
},
}
}
// Deprecated: use NewBalanceStrategyRange to avoid data race issue
var BalanceStrategyRange = NewBalanceStrategyRange()
// NewBalanceStrategySticky returns a sticky balance strategy,
// which assigns partitions to members with an attempt to preserve earlier assignments
// while maintain a balanced partition distribution.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
//
// M1: {T: [0, 2, 4]}
// M2: {T: [1, 3, 5]}
//
// On reassignment with an additional consumer, you might get an assignment plan like:
//
// M1: {T: [0, 2]}
// M2: {T: [1, 3]}
// M3: {T: [4, 5]}
func NewBalanceStrategySticky() BalanceStrategy {
return &stickyBalanceStrategy{}
}
// Deprecated: use NewBalanceStrategySticky to avoid data race issue
var BalanceStrategySticky = NewBalanceStrategySticky()
// --------------------------------------------------------------------
type balanceStrategy struct {
coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
name string
}
// Name implements BalanceStrategy.
func (s *balanceStrategy) Name() string { return s.name }
// Plan implements BalanceStrategy.
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// Build members by topic map
mbt := make(map[string][]string)
for memberID, meta := range members {
for _, topic := range meta.Topics {
mbt[topic] = append(mbt[topic], memberID)
}
}
// func to sort and de-duplicate a StringSlice
uniq := func(ss sort.StringSlice) []string {
if ss.Len() < 2 {
return ss
}
sort.Sort(ss)
var i, j int
for i = 1; i < ss.Len(); i++ {
if ss[i] == ss[j] {
continue
}
j++
ss.Swap(i, j)
}
return ss[:j+1]
}
// Assemble plan
plan := make(BalanceStrategyPlan, len(members))
for topic, memberIDs := range mbt {
s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
}
return plan, nil
}
// AssignmentData simple strategies do not require any shared assignment data
func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil
}
type stickyBalanceStrategy struct {
movements partitionMovements
}
// Name implements BalanceStrategy.
func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
// Plan implements BalanceStrategy.
func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// track partition movements during generation of the partition assignment plan
s.movements = partitionMovements{
Movements: make(map[topicPartitionAssignment]consumerPair),
PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
}
// prepopulate the current assignment state from userdata on the consumer group members
currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
if err != nil {
return nil, err
}
// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
isFreshAssignment := len(currentAssignment) == 0
// create a mapping of all current topic partitions and the consumers that can be assigned to them
partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
for topic, partitions := range topics {
for _, partition := range partitions {
partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
}
}
// create a mapping of all consumers to all potential topic partitions that can be assigned to them
// also, populate the mapping of partitions to potential consumers
consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
for memberID, meta := range members {
consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
for _, topicSubscription := range meta.Topics {
// only evaluate topic subscriptions that are present in the supplied topics map
if _, found := topics[topicSubscription]; found {
for _, partition := range topics[topicSubscription] {
topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
}
}
}
// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
if _, exists := currentAssignment[memberID]; !exists {
currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
}
}
// create a mapping of each partition to its current consumer, where possible
currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
for partition := range partition2AllPotentialConsumers {
unvisitedPartitions[partition] = true
}
var unassignedPartitions []topicPartitionAssignment
for memberID, partitions := range currentAssignment {
var keepPartitions []topicPartitionAssignment
for _, partition := range partitions {
// If this partition no longer exists at all, likely due to the
// topic being deleted, we remove the partition from the member.
if _, exists := partition2AllPotentialConsumers[partition]; !exists {
continue
}
delete(unvisitedPartitions, partition)
currentPartitionConsumers[partition] = memberID
if !strsContains(members[memberID].Topics, partition.Topic) {
unassignedPartitions = append(unassignedPartitions, partition)
continue
}
keepPartitions = append(keepPartitions, partition)
}
currentAssignment[memberID] = keepPartitions
}
for unvisited := range unvisitedPartitions {
unassignedPartitions = append(unassignedPartitions, unvisited)
}
// sort the topic partitions in order of priority for reassignment
sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
// at this point we have preserved all valid topic partition to consumer assignments and removed
// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
// to consumers so that the topic partition assignments are as balanced as possible.
// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
// Assemble plan
plan := make(BalanceStrategyPlan, len(currentAssignment))
for memberID, assignments := range currentAssignment {
if len(assignments) == 0 {
plan[memberID] = make(map[string][]int32)
} else {
for _, assignment := range assignments {
plan.Add(memberID, assignment.Topic, assignment.Partition)
}
}
}
return plan, nil
}
// AssignmentData serializes the set of topics currently assigned to the
// specified member as part of the supplied balance plan
func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return encode(&StickyAssignorUserDataV1{
Topics: topics,
Generation: generationID,
}, nil)
}
func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
return true
}
}
return false
}
// Balance assignments across consumers for maximum fairness and stickiness.
func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
initializing := len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0
// assign all unassigned partitions
for _, partition := range unassignedPartitions {
// skip if there is no potential consumer for the partition
if len(partition2AllPotentialConsumers[partition]) == 0 {
continue
}
sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
}
// narrow down the reassignment scope to only those partitions that can actually be reassigned
for partition := range partition2AllPotentialConsumers {
if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
}
}
// narrow down the reassignment scope to only those consumers that are subject to reassignment
fixedAssignments := make(map[string][]topicPartitionAssignment)
for memberID := range consumer2AllPotentialPartitions {
if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
fixedAssignments[memberID] = currentAssignment[memberID]
delete(currentAssignment, memberID)
sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
}
}
// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
preBalanceAssignment := deepCopyAssignment(currentAssignment)
preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
for k, v := range currentPartitionConsumer {
preBalancePartitionConsumers[k] = v
}
reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
// if we are not preserving existing assignments and we have made changes to the current assignment
// make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
currentAssignment = deepCopyAssignment(preBalanceAssignment)
currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
for k, v := range preBalancePartitionConsumers {
currentPartitionConsumer[k] = v
}
}
// add the fixed assignments (those that could not change) back
for consumer, assignments := range fixedAssignments {
currentAssignment[consumer] = assignments
}
}
// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
// which assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
// M0: [t0p0, t0p2, t1p1]
// M1: [t0p1, t1p0, t1p2]
func NewBalanceStrategyRoundRobin() BalanceStrategy {
return new(roundRobinBalancer)
}
// Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue
var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin()
type roundRobinBalancer struct{}
func (b *roundRobinBalancer) Name() string {
return RoundRobinBalanceStrategyName
}
func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
if len(memberAndMetadata) == 0 || len(topics) == 0 {
return nil, errors.New("members and topics are not provided")
}
// sort partitions
var topicPartitions []topicAndPartition
for topic, partitions := range topics {
for _, partition := range partitions {
topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
}
}
sort.SliceStable(topicPartitions, func(i, j int) bool {
pi := topicPartitions[i]
pj := topicPartitions[j]
return pi.comparedValue() < pj.comparedValue()
})
// sort members
var members []memberAndTopic
for memberID, meta := range memberAndMetadata {
m := memberAndTopic{
memberID: memberID,
topics: make(map[string]struct{}),
}
for _, t := range meta.Topics {
m.topics[t] = struct{}{}
}
members = append(members, m)
}
sort.SliceStable(members, func(i, j int) bool {
mi := members[i]
mj := members[j]
return mi.memberID < mj.memberID
})
// assign partitions
plan := make(BalanceStrategyPlan, len(members))
i := 0
n := len(members)
for _, tp := range topicPartitions {
m := members[i%n]
for !m.hasTopic(tp.topic) {
i++
m = members[i%n]
}
plan.Add(m.memberID, tp.topic, tp.partition)
i++
}
return plan, nil
}
func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil // do nothing for now
}
type topicAndPartition struct {
topic string
partition int32
}
func (tp *topicAndPartition) comparedValue() string {
return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
}
type memberAndTopic struct {
topics map[string]struct{}
memberID string
}
func (m *memberAndTopic) hasTopic(topic string) bool {
_, isExist := m.topics[topic]
return isExist
}
// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
// Lower balance score indicates a more balanced assignment.
func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
consumer2AssignmentSize := make(map[string]int, len(assignment))
for memberID, partitions := range assignment {
consumer2AssignmentSize[memberID] = len(partitions)
}
var score float64
for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
delete(consumer2AssignmentSize, memberID)
for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
}
}
return int(score)
}
// Determine whether the current assignment plan is balanced.
func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
min := len(currentAssignment[sortedCurrentSubscriptions[0]])
max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
if min >= max-1 {
// if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
return true
}
// create a mapping from partitions to the consumer assigned to them
allPartitions := make(map[topicPartitionAssignment]string)
for memberID, partitions := range currentAssignment {
for _, partition := range partitions {
if _, exists := allPartitions[partition]; exists {
Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
}
allPartitions[partition] = memberID
}
}
// for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
// could but did not get cannot be moved to it (because that would break the balance)
for _, memberID := range sortedCurrentSubscriptions {
consumerPartitions := currentAssignment[memberID]
consumerPartitionCount := len(consumerPartitions)
// skip if this consumer already has all the topic partitions it can get
if consumerPartitionCount == len(allSubscriptions[memberID]) {
continue
}
// otherwise make sure it cannot get any more
potentialTopicPartitions := allSubscriptions[memberID]
for _, partition := range potentialTopicPartitions {
if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
otherConsumer := allPartitions[partition]
otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
if consumerPartitionCount < otherConsumerPartitionCount {
return false
}
}
}
}
return true
}
// Reassign all topic partitions that need reassignment until balanced.
func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
reassignmentPerformed := false
modified := false
// repeat reassignment until no partition can be moved to improve the balance
for {
modified = false
// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
// until the full list is processed or a balance is achieved
for _, partition := range reassignablePartitions {
if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
break
}
// the partition must have at least two consumers
if len(partition2AllPotentialConsumers[partition]) <= 1 {
Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
}
// the partition must have a consumer
consumer := currentPartitionConsumer[partition]
if consumer == "" {
Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
}
if _, exists := prevAssignment[partition]; exists {
if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
reassignmentPerformed = true
modified = true
continue
}
}
// check if a better-suited consumer exists for the partition; if so, reassign it
for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
reassignmentPerformed = true
modified = true
break
}
}
}
if !modified {
return reassignmentPerformed
}
}
}
// Identify a new consumer for a topic partition and reassign it.
func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
for _, anotherConsumer := range sortedCurrentSubscriptions {
if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
}
}
return sortedCurrentSubscriptions
}
// Reassign a specific partition to a new consumer
func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
consumer := currentPartitionConsumer[partition]
// find the correct partition movement considering the stickiness requirement
partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
}
// Track the movement of a topic partition after assignment
func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
oldConsumer := currentPartitionConsumer[partition]
s.movements.movePartition(partition, oldConsumer, newConsumer)
currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
currentPartitionConsumer[partition] = newConsumer
return sortMemberIDsByPartitionAssignments(currentAssignment)
}
// Determine whether a specific consumer should be considered for topic partition assignment.
func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
currentPartitions := currentAssignment[memberID]
currentAssignmentSize := len(currentPartitions)
maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
if currentAssignmentSize > maxAssignmentSize {
Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
}
if currentAssignmentSize < maxAssignmentSize {
// if a consumer is not assigned all its potential partitions it is subject to reassignment
return true
}
for _, partition := range currentPartitions {
if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
return true
}
}
return false
}
// Only consider reassigning those topic partitions that have two or more potential consumers.
func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
return len(partition2AllPotentialConsumers[partition]) >= 2
}
// The assignment should improve the overall balance of the partition assignments to consumers.
func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
for _, memberID := range sortedCurrentSubscriptions {
if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
currentAssignment[memberID] = append(currentAssignment[memberID], partition)
currentPartitionConsumer[partition] = memberID
break
}
}
return sortMemberIDsByPartitionAssignments(currentAssignment)
}
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
userDataV1 := &StickyAssignorUserDataV1{}
if err := decode(userDataBytes, userDataV1, nil); err != nil {
userDataV0 := &StickyAssignorUserDataV0{}
if err := decode(userDataBytes, userDataV0, nil); err != nil {
return nil, err
}
return userDataV0, nil
}
return userDataV1, nil
}
// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
// to those topic partitions currently reported by the Kafka cluster.
func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
assignments := deepCopyAssignment(currentAssignment)
for memberID, partitions := range assignments {
// perform in-place filtering
i := 0
for _, partition := range partitions {
if _, exists := partition2AllPotentialConsumers[partition]; exists {
partitions[i] = partition
i++
}
}
assignments[memberID] = partitions[:i]
}
return assignments
}
func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
for i, assignment := range assignments {
if assignment == topic {
return append(assignments[:i], assignments[i+1:]...)
}
}
return assignments
}
func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
for _, assignment := range assignments {
if assignment == topic {
return true
}
}
return false
}
func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
for partition := range partition2AllPotentialConsumers {
unassignedPartitions[partition] = true
}
sortedPartitions := make([]topicPartitionAssignment, 0)
if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
// if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
// then we just need to simply list partitions in a round robin fashion (from consumers with
// most assigned partitions to those with least)
assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
// use priority-queue to evaluate consumer group members in descending-order based on
// the number of topic partition assignments (i.e. consumers with most assignments first)
pq := make(assignmentPriorityQueue, len(assignments))
i := 0
for consumerID, consumerAssignments := range assignments {
pq[i] = &consumerGroupMember{
id: consumerID,
assignments: consumerAssignments,
}
i++
}
heap.Init(&pq)
// loop until no consumer-group members remain
for pq.Len() != 0 {
member := pq[0]
// partitions that were assigned to a different consumer last time
var prevPartitionIndex int
for i, partition := range member.assignments {
if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
prevPartitionIndex = i
break
}
}
if len(member.assignments) > 0 {
partition := member.assignments[prevPartitionIndex]
sortedPartitions = append(sortedPartitions, partition)
delete(unassignedPartitions, partition)
if prevPartitionIndex == 0 {
member.assignments = member.assignments[1:]
} else {
member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
}
heap.Fix(&pq, 0)
} else {
heap.Pop(&pq)
}
}
for partition := range unassignedPartitions {
sortedPartitions = append(sortedPartitions, partition)
}
} else {
// an ascending sorted set of topic partitions based on how many consumers can potentially use them
sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
}
return sortedPartitions
}
func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
// sort the members by the number of partition assignments in ascending order
sortedMemberIDs := make([]string, 0, len(assignments))
for memberID := range assignments {
sortedMemberIDs = append(sortedMemberIDs, memberID)
}
sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
if ret == 0 {
return sortedMemberIDs[i] < sortedMemberIDs[j]
}
return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
})
return sortedMemberIDs
}
func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
// sort the members by the number of partition assignments in descending order
sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
i := 0
for partition := range partition2AllPotentialConsumers {
sortedPartionIDs[i] = partition
i++
}
sort.Slice(sortedPartionIDs, func(i, j int) bool {
if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
if ret == 0 {
return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
}
return ret < 0
}
return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
})
return sortedPartionIDs
}
func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
m := make(map[string][]topicPartitionAssignment, len(assignment))
for memberID, subscriptions := range assignment {
m[memberID] = append(subscriptions[:0:0], subscriptions...)
}
return m
}
func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
curMembers := make(map[string]int)
for _, cur := range partition2AllPotentialConsumers {
if len(curMembers) == 0 {
for _, curMembersElem := range cur {
curMembers[curMembersElem]++
}
continue
}
if len(curMembers) != len(cur) {
return false
}
yMap := make(map[string]int)
for _, yElem := range cur {
yMap[yElem]++
}
for curMembersMapKey, curMembersMapVal := range curMembers {
if yMap[curMembersMapKey] != curMembersMapVal {
return false
}
}
}
curPartitions := make(map[topicPartitionAssignment]int)
for _, cur := range consumer2AllPotentialPartitions {
if len(curPartitions) == 0 {
for _, curPartitionElem := range cur {
curPartitions[curPartitionElem]++
}
continue
}
if len(curPartitions) != len(cur) {
return false
}
yMap := make(map[topicPartitionAssignment]int)
for _, yElem := range cur {
yMap[yElem]++
}
for curMembersMapKey, curMembersMapVal := range curPartitions {
if yMap[curMembersMapKey] != curMembersMapVal {
return false
}
}
}
return true
}
// We need to process subscriptions' user data with each consumer's reported generation in mind
// higher generations overwrite lower generations in case of a conflict
// note that a conflict could exist only if user data is for different generations
func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
currentAssignment := make(map[string][]topicPartitionAssignment)
prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
// for each partition we create a sorted map of its consumers by generation
sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
for memberID, meta := range members {
consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
if err != nil {
return nil, nil, err
}
for _, partition := range consumerUserData.partitions() {
if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
if consumerUserData.hasGeneration() {
if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
// same partition is assigned to two consumers during the same rebalance.
// log a warning and skip this record
Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
continue
} else {
consumers[consumerUserData.generation()] = memberID
}
} else {
consumers[defaultGeneration] = memberID
}
} else {
generation := defaultGeneration
if consumerUserData.hasGeneration() {
generation = consumerUserData.generation()
}
sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
}
}
}
// prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
// current and previous consumers are the last two consumers of each partition in the above sorted map
for partition, consumers := range sortedPartitionConsumersByGeneration {
// sort consumers by generation in decreasing order
var generations []int
for generation := range consumers {
generations = append(generations, generation)
}
sort.Sort(sort.Reverse(sort.IntSlice(generations)))
consumer := consumers[generations[0]]
if _, exists := currentAssignment[consumer]; !exists {
currentAssignment[consumer] = []topicPartitionAssignment{partition}
} else {
currentAssignment[consumer] = append(currentAssignment[consumer], partition)
}
// check for previous assignment, if any
if len(generations) > 1 {
prevAssignment[partition] = consumerGenerationPair{
MemberID: consumers[generations[1]],
Generation: generations[1],
}
}
}
return currentAssignment, prevAssignment, nil
}
type consumerGenerationPair struct {
MemberID string
Generation int
}
// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
type consumerPair struct {
SrcMemberID string
DstMemberID string
}
// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
type partitionMovements struct {
PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
Movements map[topicPartitionAssignment]consumerPair
}
func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
pair := p.Movements[partition]
delete(p.Movements, partition)
partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
delete(partitionMovementsForThisTopic[pair], partition)
if len(partitionMovementsForThisTopic[pair]) == 0 {
delete(partitionMovementsForThisTopic, pair)
}
if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
delete(p.PartitionMovementsByTopic, partition.Topic)
}
return pair
}
func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
p.Movements[partition] = pair
if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
}
partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
if _, exists := partitionMovementsForThisTopic[pair]; !exists {
partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
}
partitionMovementsForThisTopic[pair][partition] = true
}
func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
pair := consumerPair{
SrcMemberID: oldConsumer,
DstMemberID: newConsumer,
}
if _, exists := p.Movements[partition]; exists {
// this partition has previously moved
existingPair := p.removeMovementRecordOfPartition(partition)
if existingPair.DstMemberID != oldConsumer {
Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
}
if existingPair.SrcMemberID != newConsumer {
// the partition is not moving back to its previous consumer
p.addPartitionMovementRecord(partition, consumerPair{
SrcMemberID: existingPair.SrcMemberID,
DstMemberID: newConsumer,
})
}
} else {
p.addPartitionMovementRecord(partition, pair)
}
}
func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
return partition
}
if _, exists := p.Movements[partition]; exists {
// this partition has previously moved
if oldConsumer != p.Movements[partition].DstMemberID {
Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
}
oldConsumer = p.Movements[partition].SrcMemberID
}
partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
reversePair := consumerPair{
SrcMemberID: newConsumer,
DstMemberID: oldConsumer,
}
if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
return partition
}
var reversePairPartition topicPartitionAssignment
for otherPartition := range partitionMovementsForThisTopic[reversePair] {
reversePairPartition = otherPartition
}
return reversePairPartition
}
func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
if src == dst {
return currentPath, false
}
if len(pairs) == 0 {
return currentPath, false
}
for _, pair := range pairs {
if src == pair.SrcMemberID && dst == pair.DstMemberID {