Skip to content

Commit

Permalink
Merge pull request #95 from joekiller/queue_stats_output
Browse files Browse the repository at this point in the history
Queue stats output
  • Loading branch information
buger committed Apr 24, 2014
2 parents ce5ee74 + 7bf6ca4 commit a6af811
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 0 deletions.
67 changes: 67 additions & 0 deletions gor_stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"time"
"log"
"strconv"
)

const (
rate = 5
)

type GorStat struct {
statName string
latest int
mean int
max int
count int
}

func NewGorStat(statName string) (s *GorStat) {
s = new(GorStat)
s.statName = statName
s.latest = 0
s.mean = 0
s.max = 0
s.count = 0

if Settings.stats {
log.Println(s.statName + ":latest,mean,max,count,count/second")
go s.reportStats()
}
return
}

func (s *GorStat) Write(latest int) {
if Settings.stats {
if latest > s.max {
s.max = latest
}
if latest != 0 {
s.mean = (s.mean + latest) / 2
}
s.latest = latest
s.count = s.count + 1
}
}

func (s *GorStat) Reset() {
s.latest = 0
s.max = 0
s.mean = 0
s.count = 0
}

func (s *GorStat) String() string {
return s.statName + ":" + strconv.Itoa(s.latest) + "," + strconv.Itoa(s.mean) + "," + strconv.Itoa(s.max) + "," + strconv.Itoa(s.count) + "," + strconv.Itoa(s.count / rate)
}

func (s *GorStat) reportStats() {
for {
log.Println(s)
s.Reset()
time.Sleep(rate * time.Second)
}
}

4 changes: 4 additions & 0 deletions output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type HTTPOutput struct {
methods HTTPMethods

elasticSearch *es.ESPlugin

bufStats *GorStat
}

func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, elasticSearchAddr string) io.Writer {
Expand All @@ -63,6 +65,7 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela
o.methods = methods

o.buf = make(chan []byte, 100)
o.bufStats = NewGorStat("output_http")

if elasticSearchAddr != "" {
o.elasticSearch = new(es.ESPlugin)
Expand Down Expand Up @@ -100,6 +103,7 @@ func (o *HTTPOutput) Write(data []byte) (n int, err error) {
copy(buf, data)

o.buf <- buf
o.bufStats.Write(len(o.buf))

return len(data), nil
}
Expand Down
3 changes: 3 additions & 0 deletions output_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type TCPOutput struct {
address string
limit int
buf chan []byte
bufStats *GorStat
}

func NewTCPOutput(options string) io.Writer {
Expand All @@ -22,6 +23,7 @@ func NewTCPOutput(options string) io.Writer {
o.address = optionsArr[0]

o.buf = make(chan []byte, 100)
o.bufStats = NewGorStat("output_tcp")

if len(optionsArr) > 1 {
o.limit, _ = strconv.Atoi(optionsArr[1])
Expand Down Expand Up @@ -52,6 +54,7 @@ func (o *TCPOutput) Write(data []byte) (n int, err error) {
data = append(data,[]byte("¶")...)
copy(new_buf, data)
o.buf <- new_buf
o.bufStats.Write(len(o.buf))

return len(data), nil
}
Expand Down
2 changes: 2 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (

type AppSettings struct {
verbose bool
stats bool

splitOutput bool

Expand Down Expand Up @@ -45,6 +46,7 @@ func init() {
flag.Usage = usage

flag.BoolVar(&Settings.verbose, "verbose", false, "Turn on verbose/debug output")
flag.BoolVar(&Settings.stats, "stats", false, "Turn on queue stats output")

flag.BoolVar(&Settings.splitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.")

Expand Down

0 comments on commit a6af811

Please sign in to comment.