diff --git a/src/cmd/tools/read_commitlog/README.md b/src/cmd/tools/read_commitlog/README.md index f590e7dba9..80d1a58961 100644 --- a/src/cmd/tools/read_commitlog/README.md +++ b/src/cmd/tools/read_commitlog/README.md @@ -7,12 +7,28 @@ $ git clone git@github.com:m3db/m3.git $ make read_commitlog $ ./bin/read_commitlog -Usage: read_commitlog [-p value] [-f value] - -p, --path=value - Commitlog file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db] +Usage: read_commitlog [-a value] [-f value] [-p value] [-s value] [-t value] [parameters ...] + -a, --action=value + Action [print,summary]. Defaults to 'print' -f, --id-filter=value - ID Contains Filter [e.g. xyz] + ID Contains Filter (optional) + -p, --path=value file path [e.g. + /var/lib/m3db/commitlogs/commitlog-0-161023.db] + -s, --id-size-filter=value + ID Size (bytes) Filter (optional) + -t, --top=value Print out only top N IDs -# example usage -# read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -f 'metric-name' > /tmp/sample-data.out +# Examples. + +# get all datapoints for a given metric +$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -f 'metric-name' + +# get summary about commit log file +$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary + +# get summary about commit log file including top 100 largest and most frequent IDs +$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary -t 100 + +# get summary about commit log file including only IDs above 1000 bytes +$ read_commitlog -p /var/lib/m3db/commitlogs/commitlog-0-161023.db -a summary -s 1000 ``` diff --git a/src/cmd/tools/read_commitlog/main/filtering_reader.go b/src/cmd/tools/read_commitlog/main/filtering_reader.go new file mode 100644 index 0000000000..1be5ba0d04 --- /dev/null +++ b/src/cmd/tools/read_commitlog/main/filtering_reader.go @@ -0,0 +1,74 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "errors" + "io" + "log" + "strings" + + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" +) + +type filteringReader struct { + reader commitlog.Reader + idFilter *string + idSizeFilter *int +} + +func newFilteringReader(path string, idFilter *string, idSizeFilter *int) (*filteringReader, error) { + opts := commitlog.NewReaderOptions(commitlog.NewOptions(), false) + reader := commitlog.NewReader(opts) + if _, err := reader.Open(path); err != nil { + return nil, err + } + return &filteringReader{reader: reader, idFilter: idFilter, idSizeFilter: idSizeFilter}, nil +} + +func (c *filteringReader) Read() (commitlog.LogEntry, bool, error) { + for { + entry, err := c.reader.Read() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return commitlog.LogEntry{}, false, err + } + series := entry.Series + if *c.idFilter != "" && !strings.Contains(series.ID.String(), *c.idFilter) { + continue + } + if *c.idSizeFilter != 0 && len(series.ID.Bytes()) < *c.idSizeFilter { + continue + } + return entry, true, nil + } + return commitlog.LogEntry{}, false, nil +} + +func (c *filteringReader) Close() { + if c != nil && c.reader != nil { + if err := c.reader.Close(); err != nil { + log.Fatalf("unable to close reader: %v", err) + } + } +} diff --git a/src/cmd/tools/read_commitlog/main/main.go b/src/cmd/tools/read_commitlog/main/main.go index 18c46469bf..3a0ec59355 100644 --- a/src/cmd/tools/read_commitlog/main/main.go +++ b/src/cmd/tools/read_commitlog/main/main.go @@ -22,24 +22,26 @@ package main import ( "encoding/base64" - "errors" "fmt" - "io" "log" "os" - "strings" + "sort" "time" "github.com/pborman/getopt" "go.uber.org/zap" - "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" ) func main() { var ( - path = getopt.StringLong("path", 'p', "", "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]") - idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)") + path = getopt.StringLong("path", 'p', "", "file path [e.g. /var/lib/m3db/commitlogs/commitlog-0-161023.db]") + idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)") + idSizeFilter = getopt.IntLong("id-size-filter", 's', 0, "ID Size (bytes) Filter (optional)") + mode = getopt.StringLong("mode", 'm', "", "Action [print,summary]. Defaults to 'print'") + top = getopt.IntLong("top", 't', 0, "Print out only top N IDs") ) getopt.Parse() @@ -54,14 +56,24 @@ func main() { os.Exit(1) } - opts := commitlog.NewReaderOptions(commitlog.NewOptions(), false) - reader := commitlog.NewReader(opts) - - _, err = reader.Open(*path) + reader, err := newFilteringReader(*path, idFilter, idSizeFilter) if err != nil { logger.Fatalf("unable to open reader: %v", err) } + defer reader.Close() + + switch *mode { + case "summary": + err = printSummary(reader, top) + default: + err = printMetrics(reader) + } + if err != nil { + logger.Fatalf("error while reading commitlog: %v", err) + } +} +func printMetrics(reader *filteringReader) error { var ( entryCount uint32 annotationSizeTotal uint64 @@ -69,19 +81,15 @@ func main() { ) for { - entry, err := reader.Read() - if errors.Is(err, io.EOF) { - break - } + entry, found, err := reader.Read() if err != nil { - logger.Fatalf("err reading commitlog: %v", err) + return err } - - series := entry.Series - if *idFilter != "" && !strings.Contains(series.ID.String(), *idFilter) { - continue + if !found { + break } + series := entry.Series fmt.Printf("{id: %s, dp: %+v, ns: %s, shard: %d", // nolint: forbidigo series.ID, entry.Datapoint, entry.Series.Namespace, entry.Series.Shard) if len(entry.Annotation) > 0 { @@ -96,11 +104,91 @@ func main() { runTime := time.Since(start) - if err := reader.Close(); err != nil { - log.Fatalf("unable to close reader: %v", err) - } - fmt.Printf("\nRunning time: %s\n", runTime) // nolint: forbidigo fmt.Printf("%d entries read\n", entryCount) // nolint: forbidigo fmt.Printf("Total annotation size: %d bytes\n", annotationSizeTotal) // nolint: forbidigo + return nil } + +func printSummary(reader *filteringReader, top *int) error { + var ( + entryCount uint32 + start = time.Now() + datapointCount = map[ident.ID]uint32{} + totalIDSize uint64 + earliestDatapoint xtime.UnixNano + oldestDatapoint xtime.UnixNano + ) + + for { + entry, found, err := reader.Read() + if err != nil { + return err + } + if !found { + break + } + dp := entry.Datapoint + + if earliestDatapoint == 0 || earliestDatapoint > dp.TimestampNanos { + earliestDatapoint = dp.TimestampNanos + } + if oldestDatapoint == 0 || oldestDatapoint < dp.TimestampNanos { + oldestDatapoint = dp.TimestampNanos + } + + datapointCount[entry.Series.ID]++ + + entryCount++ + } + + runTime := time.Since(start) + + fmt.Printf("\nRunning time: %s\n", runTime) // nolint: forbidigo + fmt.Printf("%d entries read\n", entryCount) // nolint: forbidigo + fmt.Printf("time range [%s:%s]\n", earliestDatapoint.String(), oldestDatapoint.String()) // nolint: forbidigo + + datapointCountArr := idPairs{} + sizeArr := idPairs{} + for ID, count := range datapointCount { + IDSize := len(ID.Bytes()) + totalIDSize += uint64(IDSize) + datapointCountArr = append(datapointCountArr, idPair{ID: ID, Value: count}) + sizeArr = append(sizeArr, idPair{ID: ID, Value: uint32(IDSize)}) + } + + sort.Sort(sort.Reverse(datapointCountArr)) + sort.Sort(sort.Reverse(sizeArr)) + + fmt.Printf("total ID size: %d bytes\n", totalIDSize) // nolint: forbidigo + fmt.Printf("total distinct number of IDs %d \n", len(datapointCount)) // nolint: forbidigo + + limit := len(datapointCountArr) + if *top > 0 && *top < limit { + limit = *top + } + fmt.Printf("ID datapoint counts: \n") // nolint: forbidigo + for i := 0; i < limit; i++ { + pair := datapointCountArr[i] + fmt.Printf("%-10d %s\n", pair.Value, pair.ID.String()) // nolint: forbidigo + } + + fmt.Printf("ID sizes(bytes): \n") // nolint: forbidigo + for i := 0; i < limit; i++ { + pair := sizeArr[i] + fmt.Printf("%-10d %s\n", pair.Value, pair.ID.String()) // nolint: forbidigo + } + + return nil +} + +type idPair struct { + ID ident.ID + Value uint32 +} + +type idPairs []idPair + +func (p idPairs) Len() int { return len(p) } +func (p idPairs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p idPairs) Less(i, j int) bool { return p[i].Value < p[j].Value }