-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
94 lines (77 loc) · 2.27 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"sync"
"github.com/Shopify/sarama"
"github.com/pinpoint-apm/pinpoint-go-agent"
"github.com/pinpoint-apm/pinpoint-go-agent/plugin/http"
"github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)
func outGoingRequest(ctx context.Context) string {
client := pphttp.WrapClient(nil)
request, _ := http.NewRequest("GET", "http://localhost:9001/query", nil)
request = request.WithContext(ctx)
resp, err := client.Do(request)
if nil != err {
return err.Error()
}
defer resp.Body.Close()
ret, _ := ioutil.ReadAll(resp.Body)
return string(ret)
}
func processMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {
tracer := pinpoint.FromContext(ctx)
defer tracer.NewSpanEvent("processMessage").EndSpanEvent()
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
fmt.Println("retrieving message: ", string(msg.Value))
ret := outGoingRequest(pinpoint.NewContext(context.Background(), tracer))
fmt.Println("outGoingRequest: ", ret)
return nil
}
func subscribe() {
broker := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = sarama.V2_3_0_0
consumer, err := sarama.NewConsumer(broker, config)
if err != nil {
log.Fatalf("Could not create consumer: %v", err)
}
topic := "go-sarama-test"
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Error retrieving partitionList ", err)
}
initialOffset := sarama.OffsetNewest
var wg sync.WaitGroup
ctx := ppsarama.NewContext(context.Background(), broker)
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition(topic, partition, initialOffset)
go func(pc sarama.PartitionConsumer) {
for msg := range pc.Messages() {
ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
}
wg.Done()
}(pc)
wg.Add(1)
}
wg.Wait()
}
func main() {
opts := []pinpoint.ConfigOption{
pinpoint.WithAppName("GoKafkaConsumer"),
pinpoint.WithAgentId("GoKafkaConsumerAgent"),
pinpoint.WithConfigFile(os.Getenv("HOME") + "/tmp/pinpoint-config.yaml"),
}
cfg, _ := pinpoint.NewConfig(opts...)
agent, err := pinpoint.NewAgent(cfg)
if err != nil {
log.Fatalf("pinpoint agent start fail: %v", err)
}
defer agent.Shutdown()
subscribe()
}