Skip to content

Commit

Permalink
[read_commitlog] Add summary (#4060)
Browse files Browse the repository at this point in the history
  • Loading branch information
Antanukas authored Jan 18, 2022
1 parent af06fb6 commit 68eea9a
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 29 deletions.
28 changes: 22 additions & 6 deletions src/cmd/tools/read_commitlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,28 @@
$ git clone [email protected]:m3db/m3.git
$ make read_commitlog
$ ./bin/read_commitlog
Usage: read_commitlog [-p value] [-f value]
-p, --path=value
Commitlog file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]
Usage: read_commitlog [-a value] [-f value] [-p value] [-s value] [-t value] [parameters ...]
-a, --action=value
Action [print,summary]. Defaults to 'print'
-f, --id-filter=value
ID Contains Filter [e.g. xyz]
ID Contains Filter (optional)
-p, --path=value file path [e.g.
/var/lib/m3db/commitlogs/commitlog-0-161023.db]
-s, --id-size-filter=value
ID Size (bytes) Filter (optional)
-t, --top=value Print out only top N IDs
# example usage
# read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -f 'metric-name' > /tmp/sample-data.out
# Examples.
# get all datapoints for a given metric
$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -f 'metric-name'
# get summary about commit log file
$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary
# get summary about commit log file including top 100 largest and most frequent IDs
$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary -t 100
# get summary about commit log file including only IDs above 1000 bytes
$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary -s 1000
```
74 changes: 74 additions & 0 deletions src/cmd/tools/read_commitlog/main/filtering_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"errors"
"io"
"log"
"strings"

"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
)

type filteringReader struct {
reader commitlog.Reader
idFilter *string
idSizeFilter *int
}

func newFilteringReader(path string, idFilter *string, idSizeFilter *int) (*filteringReader, error) {
opts := commitlog.NewReaderOptions(commitlog.NewOptions(), false)
reader := commitlog.NewReader(opts)
if _, err := reader.Open(path); err != nil {
return nil, err
}
return &filteringReader{reader: reader, idFilter: idFilter, idSizeFilter: idSizeFilter}, nil
}

func (c *filteringReader) Read() (commitlog.LogEntry, bool, error) {
for {
entry, err := c.reader.Read()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return commitlog.LogEntry{}, false, err
}
series := entry.Series
if *c.idFilter != "" && !strings.Contains(series.ID.String(), *c.idFilter) {
continue
}
if *c.idSizeFilter != 0 && len(series.ID.Bytes()) < *c.idSizeFilter {
continue
}
return entry, true, nil
}
return commitlog.LogEntry{}, false, nil
}

func (c *filteringReader) Close() {
if c != nil && c.reader != nil {
if err := c.reader.Close(); err != nil {
log.Fatalf("unable to close reader: %v", err)
}
}
}
134 changes: 111 additions & 23 deletions src/cmd/tools/read_commitlog/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,26 @@ package main

import (
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"sort"
"time"

"github.com/pborman/getopt"
"go.uber.org/zap"

"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

func main() {
var (
path = getopt.StringLong("path", 'p', "", "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]")
idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)")
path = getopt.StringLong("path", 'p', "", "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]")
idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)")
idSizeFilter = getopt.IntLong("id-size-filter", 's', 0, "ID Size (bytes) Filter (optional)")
mode = getopt.StringLong("mode", 'm', "", "Action [print,summary]. Defaults to 'print'")
top = getopt.IntLong("top", 't', 0, "Print out only top N IDs")
)
getopt.Parse()

Expand All @@ -54,34 +56,40 @@ func main() {
os.Exit(1)
}

opts := commitlog.NewReaderOptions(commitlog.NewOptions(), false)
reader := commitlog.NewReader(opts)

_, err = reader.Open(*path)
reader, err := newFilteringReader(*path, idFilter, idSizeFilter)
if err != nil {
logger.Fatalf("unable to open reader: %v", err)
}
defer reader.Close()

switch *mode {
case "summary":
err = printSummary(reader, top)
default:
err = printMetrics(reader)
}
if err != nil {
logger.Fatalf("error while reading commitlog: %v", err)
}
}

func printMetrics(reader *filteringReader) error {
var (
entryCount uint32
annotationSizeTotal uint64
start = time.Now()
)

for {
entry, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}
entry, found, err := reader.Read()
if err != nil {
logger.Fatalf("err reading commitlog: %v", err)
return err
}

series := entry.Series
if *idFilter != "" && !strings.Contains(series.ID.String(), *idFilter) {
continue
if !found {
break
}

series := entry.Series
fmt.Printf("{id: %s, dp: %+v, ns: %s, shard: %d", // nolint: forbidigo
series.ID, entry.Datapoint, entry.Series.Namespace, entry.Series.Shard)
if len(entry.Annotation) > 0 {
Expand All @@ -96,11 +104,91 @@ func main() {

runTime := time.Since(start)

if err := reader.Close(); err != nil {
log.Fatalf("unable to close reader: %v", err)
}

fmt.Printf("\nRunning time: %s\n", runTime) // nolint: forbidigo
fmt.Printf("%d entries read\n", entryCount) // nolint: forbidigo
fmt.Printf("Total annotation size: %d bytes\n", annotationSizeTotal) // nolint: forbidigo
return nil
}

func printSummary(reader *filteringReader, top *int) error {
var (
entryCount uint32
start = time.Now()
datapointCount = map[ident.ID]uint32{}
totalIDSize uint64
earliestDatapoint xtime.UnixNano
oldestDatapoint xtime.UnixNano
)

for {
entry, found, err := reader.Read()
if err != nil {
return err
}
if !found {
break
}
dp := entry.Datapoint

if earliestDatapoint == 0 || earliestDatapoint > dp.TimestampNanos {
earliestDatapoint = dp.TimestampNanos
}
if oldestDatapoint == 0 || oldestDatapoint < dp.TimestampNanos {
oldestDatapoint = dp.TimestampNanos
}

datapointCount[entry.Series.ID]++

entryCount++
}

runTime := time.Since(start)

fmt.Printf("\nRunning time: %s\n", runTime) // nolint: forbidigo
fmt.Printf("%d entries read\n", entryCount) // nolint: forbidigo
fmt.Printf("time range [%s:%s]\n", earliestDatapoint.String(), oldestDatapoint.String()) // nolint: forbidigo

datapointCountArr := idPairs{}
sizeArr := idPairs{}
for ID, count := range datapointCount {
IDSize := len(ID.Bytes())
totalIDSize += uint64(IDSize)
datapointCountArr = append(datapointCountArr, idPair{ID: ID, Value: count})
sizeArr = append(sizeArr, idPair{ID: ID, Value: uint32(IDSize)})
}

sort.Sort(sort.Reverse(datapointCountArr))
sort.Sort(sort.Reverse(sizeArr))

fmt.Printf("total ID size: %d bytes\n", totalIDSize) // nolint: forbidigo
fmt.Printf("total distinct number of IDs %d \n", len(datapointCount)) // nolint: forbidigo

limit := len(datapointCountArr)
if *top > 0 && *top < limit {
limit = *top
}
fmt.Printf("ID datapoint counts: \n") // nolint: forbidigo
for i := 0; i < limit; i++ {
pair := datapointCountArr[i]
fmt.Printf("%-10d %s\n", pair.Value, pair.ID.String()) // nolint: forbidigo
}

fmt.Printf("ID sizes(bytes): \n") // nolint: forbidigo
for i := 0; i < limit; i++ {
pair := sizeArr[i]
fmt.Printf("%-10d %s\n", pair.Value, pair.ID.String()) // nolint: forbidigo
}

return nil
}

type idPair struct {
ID ident.ID
Value uint32
}

type idPairs []idPair

func (p idPairs) Len() int { return len(p) }
func (p idPairs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p idPairs) Less(i, j int) bool { return p[i].Value < p[j].Value }

0 comments on commit 68eea9a

Please sign in to comment.