-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathsangrenel.go
450 lines (387 loc) · 13.1 KB
/
sangrenel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/Shopify/sarama"
"github.com/jamiealquiza/tachymeter"
)
type config struct {
brokers []string
topic string
msgSize int
msgRate uint64
batchSize int
compression sarama.CompressionCodec
compressionName string
requiredAcks sarama.RequiredAcks
requiredAcksName string
workers int
writersPerWorker int
noop bool
interval int
kafkaVersion sarama.KafkaVersion
kafkaVersionString string
tls bool
tlsCaCertificate string
tlsCertificate string
tlsPrivateKey string
tlsInsecureSkipVerify bool
}
var (
Config = &config{}
// Character selection for random messages.
chars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$^&*(){}][:<>.")
// Counters / misc.
sentCnt uint64
errCnt uint64
validVersions = []string{
"0.8.2.0",
"0.8.2.1",
"0.8.2.2",
"0.9.0.0",
"0.9.0.1",
"0.10.0.0",
"0.10.0.1",
"0.10.1.0",
"0.10.1.1",
"0.10.2.0",
"0.10.2.1",
"0.10.2.2",
"0.11.0.0",
"0.11.0.1",
"0.11.0.2",
"1.0.0",
"1.0.1",
"1.0.2",
"1.1.0",
"1.1.1",
"2.0.0",
"2.0.1",
"2.1.0",
"2.1.1",
"2.2.0",
"2.2.1",
"2.2.2",
"2.3.0",
"2.3.1",
"2.4.0",
"2.4.1",
"2.5.0",
"2.5.1",
"2.6.0",
"2.6.1",
"2.6.2",
"2.7.0",
"2.7.1",
"2.8.0",
"2.8.1",
"3.0.0",
"3.1.0"}
)
func init() {
flag.StringVar(&Config.topic, "topic", "sangrenel", "Kafka topic to produce to")
flag.IntVar(&Config.msgSize, "message-size", 300, "Message size (bytes)")
flag.Uint64Var(&Config.msgRate, "produce-rate", 100000000, "Global write rate limit (messages/sec)")
flag.IntVar(&Config.batchSize, "message-batch-size", 500, "Messages per batch")
flag.StringVar(&Config.compressionName, "compression", "none", "Message compression: none, gzip, snappy")
flag.StringVar(&Config.requiredAcksName, "required-acks", "local", "RequiredAcks config: none, local, all")
flag.BoolVar(&Config.noop, "noop", false, "Test message generation performance (does not connect to Kafka)")
flag.IntVar(&Config.workers, "workers", 1, "Number of workers")
flag.IntVar(&Config.writersPerWorker, "writers-per-worker", 5, "Number of writer (Kafka producer) goroutines per worker")
brokerString := flag.String("brokers", "localhost:9092", "Comma delimited list of Kafka brokers")
flag.IntVar(&Config.interval, "interval", 5, "Statistics output interval (seconds)")
flag.StringVar(&Config.kafkaVersionString, "api-version", "", "Explicit sarama.Version string")
flag.BoolVar(&Config.tls, "tls", false, "Whether to enable TLS communcation")
flag.StringVar(&Config.tlsCaCertificate, "tls-ca-cert", "", "Path to the CA SSL certificate")
flag.StringVar(&Config.tlsCertificate, "tls-cert-file", "", "Path to the certificate file")
flag.StringVar(&Config.tlsPrivateKey, "tls-key-file", "", "Path to the private key file")
flag.BoolVar(&Config.tlsInsecureSkipVerify, "tls-insecure-skip-verify", false, "TLS insecure skip verify")
flag.Parse()
Config.brokers = strings.Split(*brokerString, ",")
switch Config.compressionName {
case "gzip":
Config.compression = sarama.CompressionGZIP
case "snappy":
Config.compression = sarama.CompressionSnappy
case "none":
Config.compression = sarama.CompressionNone
default:
fmt.Printf("Invalid compression option: %s\n", Config.compressionName)
os.Exit(1)
}
switch Config.requiredAcksName {
case "none":
Config.requiredAcks = sarama.NoResponse
case "local":
Config.requiredAcks = sarama.WaitForLocal
case "all":
Config.requiredAcks = sarama.WaitForAll
default:
fmt.Printf("Invalid required-acks option: %s\n", Config.requiredAcksName)
os.Exit(1)
}
}
func parseKafkaVersion(kafkaVersion string) sarama.KafkaVersion {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
fmt.Printf("Invalid API version option: %s\n", Config.kafkaVersionString)
fmt.Printf("Options: %+q\n", validVersions)
os.Exit(1)
}
return version
}
func main() {
if graphiteIp != "" {
go graphiteWriter()
}
version := Config.kafkaVersionString
// Print Sangrenel startup info.
fmt.Printf("\nStarting %d client workers, %d writers per worker\n", Config.workers, Config.writersPerWorker)
fmt.Printf("Message size %d bytes, %d message limit per batch\n", Config.msgSize, Config.batchSize)
fmt.Printf("API Version: %s, Compression: %s, RequiredAcks: %s\n",
version, Config.compressionName, Config.requiredAcksName)
t := tachymeter.New(&tachymeter.Config{Size: 300000, Safe: true})
// Start client workers.
for i := 0; i < Config.workers; i++ {
go worker(i+1, t)
}
var currSentCnt, lastSentCnt uint64
var currErrCnt, lastErrCnt uint64
interval := time.Duration(Config.interval) * time.Second
ticker := time.Tick(interval)
start := time.Now()
for {
<-ticker
intervalTime := time.Since(start).Seconds()
// Set tachymeter wall time.
t.SetWallTime(time.Since(start))
// Get the sent count from the last interval, then the delta
// (sentSinceLastInterval) between the current and last interval.
lastSentCnt = currSentCnt
currSentCnt = atomic.LoadUint64(&sentCnt)
sentSinceLastInterval := currSentCnt - lastSentCnt
outputBytes, outputString := calcOutput(intervalTime, sentSinceLastInterval)
// Update error counters.
lastErrCnt = currErrCnt
currErrCnt = atomic.LoadUint64(&errCnt)
errSinceLastInterval := currErrCnt - lastErrCnt
errRate := (float64(errSinceLastInterval) / float64(sentSinceLastInterval)) * 100
// Summarize tachymeter data.
stats := t.Calc()
// Update the metrics map for the Graphite writer.
metrics["rate"] = float64(sentSinceLastInterval) / intervalTime
metrics["error_rate"] = errRate
metrics["output"] = outputBytes
metrics["p99"] = (float64(stats.Time.P99.Nanoseconds()) / 1000) / 1000
metrics["timestamp"] = float64(time.Now().Unix())
// Ship metrics if configured.
if graphiteIp != "" {
metricsOutgoing <- metrics
}
// Write output stats.
fmt.Println()
log.Printf("[ topic: %s ]\n", Config.topic)
fmt.Printf("> Messages: %s @ %.0f msgs/sec. | error rate %.2f%%\n",
outputString,
metrics["rate"],
metrics["error_rate"])
if !Config.noop {
fmt.Printf("> Batches: %.2f batches/sec. | %s p99 | %s HMean | %s Min | %s Max\n",
stats.Rate.Second, round(stats.Time.P99), round(stats.Time.HMean), round(stats.Time.Min), round(stats.Time.Max))
fmt.Println(stats.Histogram.String(50))
// Check if the tacymeter size needs to be increased
// to avoid sampling. Otherwise, just reset it.
if int(sentSinceLastInterval) > len(t.Times) {
newTachy := tachymeter.New(&tachymeter.Config{Size: int(2 * sentSinceLastInterval)})
// This is actually dangerous;
// this could swap in a tachy with unlocked
// mutexes while the current one has locks held.
*t = *newTachy
} else {
t.Reset()
}
}
// Reset interval time.
start = time.Now()
}
}
// worker is a high level producer unit and holds a single
// Kafka client. The worker's Kafka client is shared by n (Config.writersPerWorker)
// writer instances that perform the message generation and writing.
func worker(n int, t *tachymeter.Tachymeter) {
switch Config.noop {
case false:
cId := "worker_" + strconv.Itoa(n)
conf := sarama.NewConfig()
conf.Producer.Compression = Config.compression
conf.Producer.Return.Successes = true
conf.Producer.RequiredAcks = Config.requiredAcks
conf.Producer.Flush.MaxMessages = Config.batchSize
conf.Producer.MaxMessageBytes = Config.msgSize + 50
conf.Version = parseKafkaVersion(Config.kafkaVersionString)
if Config.tls {
tlsConfig := &tls.Config{
InsecureSkipVerify: Config.tlsInsecureSkipVerify,
}
if len(Config.tlsCaCertificate) > 0 {
caCert, err := ioutil.ReadFile(Config.tlsCaCertificate)
if err != nil {
log.Println(err)
os.Exit(1)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
if len(Config.tlsCertificate) > 0 && len(Config.tlsPrivateKey) > 0 {
cert, err := tls.LoadX509KeyPair(Config.tlsCertificate, Config.tlsPrivateKey)
if err != nil {
log.Fatal(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
conf.Net.TLS.Enable = true
conf.Net.TLS.Config = tlsConfig
}
client, err := sarama.NewClient(Config.brokers, conf)
if err != nil {
log.Println(err)
os.Exit(1)
} else {
log.Printf("%s connected\n", cId)
}
for i := 0; i < Config.writersPerWorker; i++ {
go writer(client, t)
}
// If noop, we're not creating connections at all.
// Just generate messages and burn CPU.
default:
for i := 0; i < Config.writersPerWorker; i++ {
go dummyWriter(t)
}
}
wait := make(chan bool)
<-wait
}
// writer generates random messages and write to Kafka.
// Each wrtier belongs to a parent worker. Writers
// throttle writes according to a global rate limiter
// and report write throughput statistics up through
// a shared tachymeter.
func writer(c sarama.Client, t *tachymeter.Tachymeter) {
// Init the producer.
producer, err := sarama.NewSyncProducerFromClient(c)
if err != nil {
log.Println(err.Error())
}
defer producer.Close()
source := rand.NewSource(time.Now().UnixNano())
generator := rand.New(source)
msgBatch := make([]*sarama.ProducerMessage, 0, Config.batchSize)
for {
// Message rate limiting works by having all writer loops incrementing
// a global counter and tracking the aggregate per-second progress.
// If the configured rate is met, the worker will sleep
// for the remainder of the 1 second window.
intervalEnd := time.Now().Add(time.Second)
countStart := atomic.LoadUint64(&sentCnt)
var sendTime time.Time
var intervalSent uint64
for {
// Estimate the batch size. This should shrink
// if we're near the rate limit. Estimated batch size =
// amount left to send for this interval / number of writers
// we have available to send this amount. If the estimate
// is lower than the configured batch size, send that amount
// instead.
toSend := (Config.msgRate - intervalSent) / uint64((Config.workers * Config.writersPerWorker))
n := int(math.Min(float64(toSend), float64(Config.batchSize)))
for i := 0; i < n; i++ {
// Gen message.
msgData := make([]byte, Config.msgSize)
randMsg(msgData, *generator)
msg := &sarama.ProducerMessage{Topic: Config.topic, Value: sarama.ByteEncoder(msgData)}
// Append to batch.
msgBatch = append(msgBatch, msg)
}
sendTime = time.Now()
err = producer.SendMessages(msgBatch)
if err != nil {
// Sarama returns a ProducerErrors, which is a slice
// of errors per message errored. Use this count
// to establish an error rate.
atomic.AddUint64(&errCnt, uint64(len(err.(sarama.ProducerErrors))))
}
t.AddTime(time.Since(sendTime))
atomic.AddUint64(&sentCnt, uint64(len(msgBatch)))
msgBatch = msgBatch[:0]
intervalSent = atomic.LoadUint64(&sentCnt) - countStart
// Break if the global rate limit was met, or, if
// we'd exceed it assuming all writers wrote a max batch size
// for this interval.
sendEstimate := intervalSent + uint64((Config.batchSize*Config.workers*Config.writersPerWorker)-Config.writersPerWorker)
if sendEstimate >= Config.msgRate {
break
}
}
// If the global per-second rate limit was met,
// the inner loop breaks and the outer loop sleeps for the interval remainder.
time.Sleep(intervalEnd.Sub(time.Now()))
}
}
// dummyWriter is initialized by the worker(s) if Config.noop is True.
// dummyWriter performs the message generation step of the normal writer,
// but doesn't connect to / attempt to send anything to Kafka. This is used
// purely for testing message generation performance.
func dummyWriter(t *tachymeter.Tachymeter) {
source := rand.NewSource(time.Now().UnixNano())
generator := rand.New(source)
msgBatch := make([]*sarama.ProducerMessage, 0, Config.batchSize)
for {
for i := 0; i < Config.batchSize; i++ {
// Gen message.
msgData := make([]byte, Config.msgSize)
randMsg(msgData, *generator)
msg := &sarama.ProducerMessage{Topic: Config.topic, Value: sarama.ByteEncoder(msgData)}
// Append to batch.
msgBatch = append(msgBatch, msg)
}
atomic.AddUint64(&sentCnt, uint64(len(msgBatch)))
t.AddTime(time.Duration(0))
msgBatch = msgBatch[:0]
}
}
// randMsg returns a random message generated from the chars byte slice.
// Message length of m bytes as defined by Config.msgSize.
func randMsg(m []byte, generator rand.Rand) {
for i := range m {
m[i] = chars[generator.Intn(len(chars))]
}
}
// calcOutput takes a duration t and messages sent
// and returns message rates in human readable network speeds.
func calcOutput(t float64, n uint64) (float64, string) {
m := (float64(n) / t) * float64(Config.msgSize)
var o string
switch {
case m >= 131072:
o = strconv.FormatFloat(m/131072, 'f', 0, 64) + "Mb/sec"
case m < 131072:
o = strconv.FormatFloat(m/1024, 'f', 0, 64) + "KB/sec"
}
return m, o
}
func round(t time.Duration) time.Duration {
return t / 1000 * 1000
}