Skip to content

Commit

Permalink
Misc output improvements & stid fix
Browse files Browse the repository at this point in the history
Fixed stdin ingestion
Added more stats to the output
  • Loading branch information
leucos committed Apr 26, 2019
1 parent 9331785 commit 703e239
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ $(BIN):
@mkdir -p $@
$(BIN)/%: | $(BIN) ; $(info $(M) building $(REPOSITORY)…)
$Q tmp=$$(mktemp -d); \
env GO111MODULE=off GOCACHE=off GOPATH=$$tmp GOBIN=$(BIN) $(GO) get $(REPOSITORY) \
env GO111MODULE=off GOCACHE=on GOPATH=$$tmp GOBIN=$(BIN) $(GO) get $(REPOSITORY) \
|| ret=$$?; \
rm -rf $$tmp ; exit $$ret

Expand Down Expand Up @@ -139,4 +139,4 @@ help:

.PHONY: version
version:
@echo $(VERSION)
@echo $(VERSION)
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Alternative to `pt-query-digest`.
[![Build Status](https://travis-ci.org/devops-works/dw-query-digest.svg?branch=master)](https://travis-ci.org/devops-works/dw-query-digest)
[![Go Report Card](https://goreportcard.com/badge/github.com/devops-works/dw-query-digest)](https://goreportcard.com/report/github.com/devops-works/dw-query-digest)
[![SonarQube](https://sonarcloud.io/api/project_badges/measure?project=dw-query-digest&metric=alert_status)](https://sonarcloud.io/dashboard?id=dw-query-digest)
[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=dw-query-digest&metric=coverage)](https://sonarcloud.io/dashboard?id=dw-query-digest)
[![MIT Licence](https://badges.frapsoft.com/os/mit/mit.svg?v=103)](https://opensource.org/licenses/mit-license.php)

## Purpose
Expand Down
37 changes: 24 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ func init() {
regexeps = []replacements{
// 1· Group all SELECT queries from mysqldump together
// ... not implemented ...
// 2· Shorten multi-value INSERT statements to a single VALUES() list.
{regexp.MustCompile(`(insert .*) values.*`), "$1 values (?)"},
// 3· Strip comments.
{regexp.MustCompile(`(.*)/\*.*\*/(.*)`), "$1$2"},
{regexp.MustCompile(`(.*) --.*`), "$1"},
// 2· Shorten multi-value INSERT statements to a single VALUES() list.
{regexp.MustCompile(`^(insert .*) values.*`), "$1 values (?)"},
// 4· Abstract the databases in USE statements
// ... not implemented ... since I don't really get it
// 5· Sort of...
Expand Down Expand Up @@ -215,12 +215,14 @@ func main() {
if Config.FileName == "" || Config.FileName == "-" {
log.Info(`reading from STDIN`)
Config.FileName = ""
Config.DisableCache = true
Config.Follow = true
Config.ShowProgress = false
file = os.Stdin
} else if Config.Follow {
log.Info(`follow enabled`)
Config.ShowProgress = false
Config.DisableCache = true

piper, pipew = io.Pipe()

Expand Down Expand Up @@ -255,8 +257,8 @@ func main() {
// If cache is not disabled and we're are not tailing input
// Try to display from cache
// If it succeeds, we've done our job
if !Config.DisableCache && !Config.Follow && runFromCache(flag.Arg(0)) {
log.Info(`results rerieved from cache`)
if !Config.DisableCache && runFromCache(flag.Arg(0)) {
log.Info(`results retrieved from cache`)
os.Exit(0)
}

Expand All @@ -280,7 +282,9 @@ func main() {

wg.Add(1)

if Config.Follow {
if Config.FileName == "" {
go fileReader(&wg, file, logentries, 0)
} else if Config.Follow {
go fileReader(&wg, piper, logentries, 0)
} else {
count, err := lineCounter(file)
Expand All @@ -293,11 +297,14 @@ func main() {
panic(err)
}

servermeta.CumLines = count
log.Infof("file has %d lines\n", count)

go fileReader(&wg, file, logentries, count)
}

servermeta.AnalysisStart = time.Now()

// We do not Add this one
// We do not wait for it in the wg
// but using <-done
Expand Down Expand Up @@ -478,12 +485,11 @@ func worker(wg *sync.WaitGroup, lines <-chan logentry, entries chan<- query) {
// var err error
for lineblock := range lines {
qry := query{}
// fmt.Printf("HERE: %v", lineblock.lines)
for _, line := range lineblock.lines {
if line == "" {
break
}
// fmt.Printf("LINE: %s\n", line)

switch strings.ToUpper(line[0:4]) {

case "# TI":
Expand Down Expand Up @@ -562,8 +568,8 @@ func aggregator(queries <-chan query, done chan<- bool, tickerdelay time.Duratio

servermeta.CumBytes = 0
servermeta.QueryCount = 0
servermeta.StartTime = time.Now()
servermeta.EndTime = time.Unix(0, 0)
servermeta.Start = time.Now()
servermeta.End = time.Unix(0, 0)

var ticker *time.Ticker

Expand Down Expand Up @@ -602,11 +608,11 @@ func aggregator(queries <-chan query, done chan<- bool, tickerdelay time.Duratio

servermeta.QueryCount++
servermeta.CumBytes += qry.BytesSent
if servermeta.StartTime.After(qry.Time) {
servermeta.StartTime = qry.Time
if servermeta.Start.After(qry.Time) {
servermeta.Start = qry.Time
}
if servermeta.EndTime.Before(qry.Time) {
servermeta.EndTime = qry.Time
if servermeta.End.Before(qry.Time) {
servermeta.End = qry.Time
}

if _, ok := querylist[qry.Hash]; !ok {
Expand Down Expand Up @@ -641,6 +647,11 @@ func aggregator(queries <-chan query, done chan<- bool, tickerdelay time.Duratio
// displayReport show a report given the select output
func displayReport(querylist map[[32]byte]*outputs.QueryStats, final bool) {
servermeta.UniqueQueries = len(querylist)
servermeta.AnalysisEnd = time.Now()
servermeta.AnalysisDuration = servermeta.AnalysisEnd.Sub(servermeta.AnalysisStart).Seconds()
servermeta.AnalysedLinesPerSecond = float64(servermeta.CumLines) / servermeta.AnalysisDuration
servermeta.AnalysedBytesPerSecond = float64(servermeta.CumBytes) / servermeta.AnalysisDuration
servermeta.AnalysedQueriesPerSecond = float64(servermeta.QueryCount) / servermeta.AnalysisDuration

s := make(outputs.QueryStatsSlice, 0, len(querylist))
for _, d := range querylist {
Expand Down
10 changes: 5 additions & 5 deletions outputs/greppable/greppable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ func Display(servermeta outputs.ServerInfo, s outputs.QueryStatsSlice, w io.Writ
fmt.Fprintf(w, "Total queries:%.3fM (%d);", float64(servermeta.QueryCount)/1000000.0, servermeta.QueryCount)
fmt.Fprintf(w, "Total bytes:%.3fM (%d);", float64(servermeta.CumBytes)/1000000.0, servermeta.CumBytes)
fmt.Fprintf(w, "Total fingerprints:%d;", servermeta.UniqueQueries)
fmt.Fprintf(w, "Capture start:%s;", servermeta.StartTime)
fmt.Fprintf(w, "Capture end:%s;", servermeta.EndTime)
fmt.Fprintf(w, "Duration:%s (%d s);", servermeta.EndTime.Sub(servermeta.StartTime), servermeta.EndTime.Sub(servermeta.StartTime)/time.Second)
fmt.Fprintf(w, "QPS:%.0f\n", float64(time.Second)*(float64(servermeta.QueryCount)/float64(servermeta.EndTime.Sub(servermeta.StartTime))))
fmt.Fprintf(w, "Capture start:%s;", servermeta.Start)
fmt.Fprintf(w, "Capture end:%s;", servermeta.End)
fmt.Fprintf(w, "Duration:%s (%d s);", servermeta.End.Sub(servermeta.Start), servermeta.End.Sub(servermeta.Start)/time.Second)
fmt.Fprintf(w, "QPS:%.0f\n", float64(time.Second)*(float64(servermeta.QueryCount)/float64(servermeta.End.Sub(servermeta.Start))))

fmt.Fprintf(w, "# 1_Pos;2_QueryID;3_Fingerprint;4_Schema;5_Calls;")
fmt.Fprintf(w, "6_CumErrored;7_CumKilled;8_CumQueryTime(s);9_CumLockTime(s);10_CumRowsSent;")
fmt.Fprintf(w, "11_CumRowsExamined;12_CumRowsAffected;13_CumBytesSent;14_Concurency(%%);15_Min(s);16_Max(s);")
fmt.Fprintf(w, "17_Mean(s);18_P50(s);19_P95(s);20_StdDev(s)\n")

ffactor := 100.0 * float64(time.Second) / float64(servermeta.EndTime.Sub(servermeta.StartTime))
ffactor := 100.0 * float64(time.Second) / float64(servermeta.End.Sub(servermeta.Start))
for idx, val := range s {
val.Concurrency = val.CumQueryTime * ffactor
sort.Float64s(val.QueryTime)
Expand Down
29 changes: 18 additions & 11 deletions outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@ import (

// ServerInfo holds server information gathered from first 2 log lines
type ServerInfo struct {
Binary string `json:"binary"`
VersionShort string `json:"versionShort"`
Version string `json:"version"`
VersionDescription string `json:"versionDescription"`
TCPPort int `json:"tcpPort"`
UnixSocket string `json:"unixSocket"`
CumBytes int `json:"cumBytes"`
QueryCount int `json:"queryCount"`
UniqueQueries int `json:"uniqueQueries"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
Binary string `json:"binary"`
VersionShort string `json:"versionShort"`
Version string `json:"version"`
VersionDescription string `json:"versionDescription"`
TCPPort int `json:"tcpPort"`
UnixSocket string `json:"unixSocket"`
CumBytes int `json:"cumBytes"`
CumLines int `json:"cumLines"`
QueryCount int `json:"queryCount"`
UniqueQueries int `json:"uniqueQueries"`
Start time.Time `json:"Start"`
End time.Time `json:"End"`
AnalysisStart time.Time `json:"analysisStart"`
AnalysisEnd time.Time `json:"analysisEnd"`
AnalysedLinesPerSecond float64 `json:"analysedLinesPerSecond"`
AnalysedQueriesPerSecond float64 `json:"analysedQueriesPerSecond"`
AnalysedBytesPerSecond float64 `json:"analysedBytesPerSecond"`
AnalysisDuration float64 `json:"analysisDuration"`
// May be merge querystats here with:
// Queries []QueryStats ?
}
Expand Down
18 changes: 13 additions & 5 deletions outputs/terminal/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,26 @@ func Display(servermeta outputs.ServerInfo, s outputs.QueryStatsSlice, w io.Writ
fmt.Fprintf(w, " TCPPort : %d\n", servermeta.TCPPort)
fmt.Fprintf(w, " UnixSocket : %s\n", servermeta.UnixSocket)

fmt.Fprintf(w, "\n# Internal Analyzer Statistics\n\n")
// fmt.Fprintf(w, " Start : %s\n", servermeta.AnalysisStart)
fmt.Fprintf(w, " Duration : %14.3fs\n", servermeta.AnalysisDuration)
fmt.Fprintf(w, " Log lines : %14.3fM (%d)\n", float64(servermeta.CumLines)/1000000.0, servermeta.CumLines)
fmt.Fprintf(w, " Lines/s : %14.3f\n", servermeta.AnalysedLinesPerSecond)
fmt.Fprintf(w, " Bytes/s : %14.3f\n", servermeta.AnalysedBytesPerSecond)
fmt.Fprintf(w, " Queries/s : %14.3f\n", servermeta.AnalysedQueriesPerSecond)

fmt.Fprintf(w, "\n# Global Statistics\n\n")
fmt.Fprintf(w, " Total queries : %.3fM (%d)\n", float64(servermeta.QueryCount)/1000000.0, servermeta.QueryCount)
fmt.Fprintf(w, " Total bytes : %.3fM (%d)\n", float64(servermeta.CumBytes)/1000000.0, servermeta.CumBytes)
fmt.Fprintf(w, " Total fingerprints : %d\n", servermeta.UniqueQueries)
fmt.Fprintf(w, " Capture start : %s\n", servermeta.StartTime)
fmt.Fprintf(w, " Capture end : %s\n", servermeta.EndTime)
fmt.Fprintf(w, " Duration : %s (%d s)\n", servermeta.EndTime.Sub(servermeta.StartTime), servermeta.EndTime.Sub(servermeta.StartTime)/time.Second)
fmt.Fprintf(w, " QPS : %.0f\n", float64(time.Second)*(float64(servermeta.QueryCount)/float64(servermeta.EndTime.Sub(servermeta.StartTime))))
fmt.Fprintf(w, " Capture start : %s\n", servermeta.Start)
fmt.Fprintf(w, " Capture end : %s\n", servermeta.End)
fmt.Fprintf(w, " Duration : %s (%d s)\n", servermeta.End.Sub(servermeta.Start), servermeta.End.Sub(servermeta.Start)/time.Second)
fmt.Fprintf(w, " QPS : %.0f\n", float64(time.Second)*(float64(servermeta.QueryCount)/float64(servermeta.End.Sub(servermeta.Start))))

fmt.Fprintf(w, "\n# Queries\n")

ffactor := 100.0 * float64(time.Second) / float64(servermeta.EndTime.Sub(servermeta.StartTime))
ffactor := 100.0 * float64(time.Second) / float64(servermeta.End.Sub(servermeta.Start))
for idx, val := range s {
val.Concurrency = val.CumQueryTime * ffactor
sort.Float64s(val.QueryTime)
Expand Down

0 comments on commit 703e239

Please sign in to comment.