From c250c3e30c829dc6de81816b2139705b86a6f121 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 11:34:51 -0400 Subject: [PATCH 01/18] add gor_stat --- gor_stat.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++ output_http.go | 4 ++++ settings.go | 1 + 3 files changed, 62 insertions(+) create mode 100644 gor_stat.go diff --git a/gor_stat.go b/gor_stat.go new file mode 100644 index 00000000..5291090e --- /dev/null +++ b/gor_stat.go @@ -0,0 +1,57 @@ +package main + +import ( + "strconv" + "time" + "log" +) + +const ( + rate = int64(5) +) + +type GorStat struct { + statName string + latest int + min int + max int + + currentTime int64 +} + +func NewGorStat(statName string) (s *GorStat) { + s = new(GorStat) + s.statName = statName + s.latest = 0 + s.min = 0 + s.max = 0 + + if Settings.stats { + go s.reportStats() + } + return +} + +func (s *GorStat) Write(latest int) { + if Settings.stats { + if latest > s.max { + s.max = latest + } + if latest < s.min { + s.min = latest + } + s.latest = latest + } +} + +func (s *GorStat) String() string { + return s.statName + ":" + Itoa(s.latest) + "," + Itoa(s.min) + "," + Itoa(s.max) +} + +func (s *GorStat) reportStats() { + if (time.Now().UnixNano() - s.currentTime) > (rate * time.Second.Nanoseconds()) { + s.currentTime = time.Now().UnixNano() + log.Println(s) + } +} + diff --git a/output_http.go b/output_http.go index 49c3efd7..7e813222 100644 --- a/output_http.go +++ b/output_http.go @@ -46,6 +46,8 @@ type HTTPOutput struct { methods HTTPMethods elasticSearch *es.ESPlugin + + bufStats GorStat } func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, elasticSearchAddr string) io.Writer { @@ -63,6 +65,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela o.methods = methods o.buf = make(chan []byte, 100) + o.bufStats = NewGorStat("OUTPUT_HTTP") if elasticSearchAddr != "" { o.elasticSearch = new(es.ESPlugin) @@ -100,6 +103,7 @@ func (o *HTTPOutput) Write(data []byte) (n int, err error) { copy(buf, data) o.buf <- buf + o.bufStats.Write(len(o.buf)) return len(data), nil } diff --git a/settings.go b/settings.go index 2e7d5c4f..83051ca7 100644 --- a/settings.go +++ b/settings.go @@ -45,6 +45,7 @@ func init() { flag.Usage = usage flag.BoolVar(&Settings.verbose, "verbose", false, "Turn on verbose/debug output") + flag.BoolVar(&Settings.stats, "stats", false, "Turn on queue stats output") flag.BoolVar(&Settings.splitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.") From 610552be883667d34c8486cec7c9916eca08a4bc Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 11:45:39 -0400 Subject: [PATCH 02/18] fixing stat errors --- gor_stat.go | 10 +++++----- output_http.go | 2 +- settings.go | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index 5291090e..1324740f 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -1,9 +1,9 @@ package main import ( - "strconv" "time" "log" + "strconv" ) const ( @@ -19,8 +19,8 @@ type GorStat struct { currentTime int64 } -func NewGorStat(statName string) (s *GorStat) { - s = new(GorStat) +func NewGorStat(statName string) GorStat { + s := new(GorStat) s.statName = statName s.latest = 0 s.min = 0 @@ -29,7 +29,7 @@ func NewGorStat(statName string) (s *GorStat) { if Settings.stats { go s.reportStats() } - return + return s } func (s *GorStat) Write(latest int) { @@ -45,7 +45,7 @@ func (s *GorStat) Write(latest int) { } func (s *GorStat) String() string { - return s.statName + ":" + Itoa(s.latest) + "," + Itoa(s.min) + "," + Itoa(s.max) + return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.min) + "," + strconv.Itoa(s.max) } func (s *GorStat) reportStats() { diff --git a/output_http.go b/output_http.go index 7e813222..b05af828 100644 --- a/output_http.go +++ b/output_http.go @@ -47,7 +47,7 @@ type HTTPOutput struct { elasticSearch *es.ESPlugin - bufStats GorStat + bufStats *GorStat } func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, elasticSearchAddr string) io.Writer { diff --git a/settings.go b/settings.go index 83051ca7..ea888f0f 100644 --- a/settings.go +++ b/settings.go @@ -13,6 +13,7 @@ const ( type AppSettings struct { verbose bool + stats bool splitOutput bool From b5c2d7747cc78007dca0ab3b0b973a897e681734 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 11:47:06 -0400 Subject: [PATCH 03/18] fixing pointers --- output_http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/output_http.go b/output_http.go index b05af828..7e813222 100644 --- a/output_http.go +++ b/output_http.go @@ -47,7 +47,7 @@ type HTTPOutput struct { elasticSearch *es.ESPlugin - bufStats *GorStat + bufStats GorStat } func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, elasticSearchAddr string) io.Writer { From 959ac29661e996b78a9a541449fc9e97c92243e4 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 11:48:49 -0400 Subject: [PATCH 04/18] fixing pointers --- gor_stat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index 1324740f..bc6eb655 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -19,7 +19,7 @@ type GorStat struct { currentTime int64 } -func NewGorStat(statName string) GorStat { +func NewGorStat(statName string) (s *GorStat) { s := new(GorStat) s.statName = statName s.latest = 0 @@ -29,7 +29,7 @@ func NewGorStat(statName string) GorStat { if Settings.stats { go s.reportStats() } - return s + return } func (s *GorStat) Write(latest int) { From d5535ee7f3b0b9c1030a4cac67c0081def974aa2 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 11:50:23 -0400 Subject: [PATCH 05/18] fixing pointers --- gor_stat.go | 2 +- output_http.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index bc6eb655..db35b42f 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -20,7 +20,7 @@ type GorStat struct { } func NewGorStat(statName string) (s *GorStat) { - s := new(GorStat) + s = new(GorStat) s.statName = statName s.latest = 0 s.min = 0 diff --git a/output_http.go b/output_http.go index 7e813222..b05af828 100644 --- a/output_http.go +++ b/output_http.go @@ -47,7 +47,7 @@ type HTTPOutput struct { elasticSearch *es.ESPlugin - bufStats GorStat + bufStats *GorStat } func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, elasticSearchAddr string) io.Writer { From 62254b62653e492bcdbda8cf29a1e0e5cca78041 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:20:43 -0400 Subject: [PATCH 06/18] updated stat printer --- gor_stat.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index db35b42f..c50b620f 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -13,17 +13,15 @@ const ( type GorStat struct { statName string latest int - min int + mean int max int - - currentTime int64 } func NewGorStat(statName string) (s *GorStat) { s = new(GorStat) s.statName = statName s.latest = 0 - s.min = 0 + s.mean = 0 s.max = 0 if Settings.stats { @@ -37,21 +35,21 @@ func (s *GorStat) Write(latest int) { if latest > s.max { s.max = latest } - if latest < s.min { - s.min = latest + if latest != 0 { + s.mean = (s.mean + latest) / 2 } s.latest = latest } } func (s *GorStat) String() string { - return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.min) + "," + strconv.Itoa(s.max) + return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) } func (s *GorStat) reportStats() { - if (time.Now().UnixNano() - s.currentTime) > (rate * time.Second.Nanoseconds()) { - s.currentTime = time.Now().UnixNano() - log.Println(s) + for { + log.Println(s) + time.Sleep(rate * time.Second.Nanoseconds()) } } From c97f02ff4c077096e0ca4bcfd214f3c2aa6c4066 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:22:47 -0400 Subject: [PATCH 07/18] updated stat printer --- gor_stat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gor_stat.go b/gor_stat.go index c50b620f..92b5d2d4 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -49,7 +49,7 @@ func (s *GorStat) String() string { func (s *GorStat) reportStats() { for { log.Println(s) - time.Sleep(rate * time.Second.Nanoseconds()) + time.Sleep(rate * time.Second) } } From 40d835e143682d0f024b95e06a9848eb704d1a1a Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:23:15 -0400 Subject: [PATCH 08/18] update rate types --- gor_stat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gor_stat.go b/gor_stat.go index 92b5d2d4..2db6abb7 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -7,7 +7,7 @@ import ( ) const ( - rate = int64(5) + rate = 5 ) type GorStat struct { From 9461217a0e7dd834f91383659f83eb110a9afd7e Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:43:15 -0400 Subject: [PATCH 09/18] write stat help when init --- gor_stat.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gor_stat.go b/gor_stat.go index 2db6abb7..5990b98b 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -25,6 +25,7 @@ func NewGorStat(statName string) (s *GorStat) { s.max = 0 if Settings.stats { + return s.statName + ":latest,mean,max" go s.reportStats() } return @@ -32,6 +33,7 @@ func NewGorStat(statName string) (s *GorStat) { func (s *GorStat) Write(latest int) { if Settings.stats { + log.Println(s) if latest > s.max { s.max = latest } From b52d562dad08ae8a777003eef4b092abcb05f7c5 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:47:16 -0400 Subject: [PATCH 10/18] tweaking stat output --- gor_stat.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/gor_stat.go b/gor_stat.go index 5990b98b..21b4f5c1 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -44,6 +44,12 @@ func (s *GorStat) Write(latest int) { } } +func (s *GorStat) Reset() { + s.latest = 0 + s.max = 0 + s.mean = 0 +} + func (s *GorStat) String() string { return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) } @@ -51,6 +57,7 @@ func (s *GorStat) String() string { func (s *GorStat) reportStats() { for { log.Println(s) + s.Reset() time.Sleep(rate * time.Second) } } From db14154eb9bdc8d9da4f9734c08b987367529c61 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:48:18 -0400 Subject: [PATCH 11/18] fix typo --- gor_stat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gor_stat.go b/gor_stat.go index 21b4f5c1..ef3697c7 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -25,7 +25,7 @@ func NewGorStat(statName string) (s *GorStat) { s.max = 0 if Settings.stats { - return s.statName + ":latest,mean,max" + log.Println(s.statName + ":latest,mean,max") go s.reportStats() } return From 7bf2015e90a33c5b362a1330c41e6ea7be50f554 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:52:01 -0400 Subject: [PATCH 12/18] fixing stat output --- gor_stat.go | 1 - 1 file changed, 1 deletion(-) diff --git a/gor_stat.go b/gor_stat.go index ef3697c7..b18eeb3d 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -33,7 +33,6 @@ func NewGorStat(statName string) (s *GorStat) { func (s *GorStat) Write(latest int) { if Settings.stats { - log.Println(s) if latest > s.max { s.max = latest } From c49c19428d184be245ed10c31436df8c4198f65c Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 12:58:41 -0400 Subject: [PATCH 13/18] as many workers are their are channel --- output_http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/output_http.go b/output_http.go index b05af828..e338053c 100644 --- a/output_http.go +++ b/output_http.go @@ -76,7 +76,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela o.limit, _ = strconv.Atoi(optionsArr[1]) } - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { go o.worker(i) } From dbfea7ceaceebcb9dd9f987769ec0cdc01675a51 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 14:02:33 -0400 Subject: [PATCH 14/18] as many workers are their are channel --- output_http.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/output_http.go b/output_http.go index e338053c..b6eb9b58 100644 --- a/output_http.go +++ b/output_http.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "time" + "runtime" ) type RedirectNotAllowed struct{} @@ -76,7 +77,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela o.limit, _ = strconv.Atoi(optionsArr[1]) } - for i := 0; i < 100; i++ { + for i := 0; i < runtime.NumCPU(); i++ { go o.worker(i) } From 47200709962ff308c74b9706c23d634631eb0f68 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 14:05:42 -0400 Subject: [PATCH 15/18] finished stat for output_http --- output_http.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/output_http.go b/output_http.go index b6eb9b58..b05af828 100644 --- a/output_http.go +++ b/output_http.go @@ -11,7 +11,6 @@ import ( "strconv" "strings" "time" - "runtime" ) type RedirectNotAllowed struct{} @@ -77,7 +76,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela o.limit, _ = strconv.Atoi(optionsArr[1]) } - for i := 0; i < runtime.NumCPU(); i++ { + for i := 0; i < 10; i++ { go o.worker(i) } From 41fe11c12f14a015d934db015edafde7b99d2e07 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 16:06:29 -0400 Subject: [PATCH 16/18] added output_tcp queue stats --- output_http.go | 2 +- output_tcp.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/output_http.go b/output_http.go index b05af828..bd729b92 100644 --- a/output_http.go +++ b/output_http.go @@ -65,7 +65,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela o.methods = methods o.buf = make(chan []byte, 100) - o.bufStats = NewGorStat("OUTPUT_HTTP") + o.bufStats = NewGorStat("output_http") if elasticSearchAddr != "" { o.elasticSearch = new(es.ESPlugin) diff --git a/output_tcp.go b/output_tcp.go index 17e5c90a..48e2a870 100644 --- a/output_tcp.go +++ b/output_tcp.go @@ -13,6 +13,7 @@ type TCPOutput struct { address string limit int buf chan []byte + bufStats *GorStat } func NewTCPOutput(options string) io.Writer { @@ -22,6 +23,7 @@ func NewTCPOutput(options string) io.Writer { o.address = optionsArr[0] o.buf = make(chan []byte, 100) + o.bufStats = NewGorStat("output_tcp") if len(optionsArr) > 1 { o.limit, _ = strconv.Atoi(optionsArr[1]) @@ -52,6 +54,7 @@ func (o *TCPOutput) Write(data []byte) (n int, err error) { data = append(data,[]byte("ΒΆ")...) copy(new_buf, data) o.buf <- new_buf + o.bufStats.Write(len(o.buf)) return len(data), nil } From 31aef9ee1d2d14918069a921585d5ee7694f8e64 Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 16:22:27 -0400 Subject: [PATCH 17/18] adding count stat --- gor_stat.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index b18eeb3d..54d5b96f 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -15,6 +15,7 @@ type GorStat struct { latest int mean int max int + count int } func NewGorStat(statName string) (s *GorStat) { @@ -23,9 +24,10 @@ func NewGorStat(statName string) (s *GorStat) { s.latest = 0 s.mean = 0 s.max = 0 + s.count = 0 if Settings.stats { - log.Println(s.statName + ":latest,mean,max") + log.Println(s.statName + ":latest,mean,max,count") go s.reportStats() } return @@ -40,6 +42,7 @@ func (s *GorStat) Write(latest int) { s.mean = (s.mean + latest) / 2 } s.latest = latest + s.count = s.count + 1 } } @@ -47,10 +50,11 @@ func (s *GorStat) Reset() { s.latest = 0 s.max = 0 s.mean = 0 + s.count = 0 } func (s *GorStat) String() string { - return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) + return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) + "," + strconv.Itoa(s.count) } func (s *GorStat) reportStats() { From 7bf6ca4c8773f015193d12241e5ab2343e67287b Mon Sep 17 00:00:00 2001 From: Joseph Lawson Date: Wed, 23 Apr 2014 16:25:22 -0400 Subject: [PATCH 18/18] adding count stat --- gor_stat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gor_stat.go b/gor_stat.go index 54d5b96f..732e87f8 100644 --- a/gor_stat.go +++ b/gor_stat.go @@ -27,7 +27,7 @@ func NewGorStat(statName string) (s *GorStat) { s.count = 0 if Settings.stats { - log.Println(s.statName + ":latest,mean,max,count") + log.Println(s.statName + ":latest,mean,max,count,count/second") go s.reportStats() } return @@ -54,7 +54,7 @@ func (s *GorStat) Reset() { } func (s *GorStat) String() string { - return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) + "," + strconv.Itoa(s.count) + return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) + "," + strconv.Itoa(s.count) + "," + strconv.Itoa(s.count / rate) } func (s *GorStat) reportStats() {