-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmetadata.go
65 lines (55 loc) · 1.71 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main
import (
"fmt"
"strings"
"github.com/Shopify/sarama"
"github.com/samuel/go-zookeeper/zk"
)
type PartitionInfo struct {
Topic string
Partition int32
Leader int32
LeaderAddress string
Replicas []int32
Isr string
}
func GetPartitionInfo(client sarama.Client, topic string, partitions []int32, c *zk.Conn, basePath string) []*PartitionInfo {
var infos []*PartitionInfo
for _, partition := range partitions {
replicas, err := client.Replicas(topic, partition)
if err != nil {
fmt.Printf("failed to get replicas for topic=%s, partition=%d\n", topic, partition)
leader, _ := client.Leader(topic, partition)
infos = append(infos, &PartitionInfo{Topic: topic, Partition: partition, Leader: leader.ID(), Replicas: nil})
continue
}
leader, _ := client.Leader(topic, partition)
info := &PartitionInfo{
Topic: topic,
Partition: partition,
Leader: leader.ID(),
LeaderAddress: leader.Addr(),
Replicas: replicas,
Isr: getIsr(c, basePath, topic, partition)}
infos = append(infos, info)
}
return infos
}
func getIsr(conn *zk.Conn, basePath string, topic string, partition int32) string {
if conn == nil {
return ""
}
path := fmt.Sprintf("%s/brokers/topics/%s/partitions/%d/state", basePath, topic, partition)
bytes, _, err := conn.Get(path)
if err != nil {
fmt.Printf("failed to get ISR for topic=%s, partition=%d becaue of %v\n", topic, partition, err)
return ""
}
//{"controller_epoch":6,"leader":1,"version":1,"leader_epoch":23,"isr":[1,4,32]}
isr := string(bytes)
i := strings.Index(isr, `isr":[`)
isr = isr[i+6:]
i = strings.Index(isr, "]")
isr = isr[:i]
return strings.Replace(isr, ",", " ", -1)
}