Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speeding up file walk #589

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"net"
"net/http"
Expand All @@ -49,6 +50,7 @@ import (
"go.uber.org/zap"

"github.com/NYTimes/gziphandler"
"github.com/charlievieth/fastwalk"
"github.com/dgryski/go-expirecache"
"github.com/dgryski/go-trigram"
"github.com/dgryski/httputil"
Expand Down Expand Up @@ -900,10 +902,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
var t0 = time.Now()
var fidx = listener.CurrentFileIndex()
var files []string
var filesLen int
var filesMutex sync.Mutex
var filesLen atomic.Uint64
var details = make(map[string]*protov3.MetricDetails)
var trieIdx *trieIndex
var metricsKnown uint64
var metricsKnown atomic.Uint64
var infos []zap.Field
if listener.trieIndex {
if fidx == nil || !listener.concurrentIndex {
Expand All @@ -927,7 +930,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
files = append(files, fileName)
}
if strings.HasSuffix(fileName, ".wsp") {
metricsKnown++
metricsKnown.Add(1)
}
}
cacheIndexRuntime := time.Since(tcache)
Expand Down Expand Up @@ -972,9 +975,9 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
break
}

filesLen++
filesLen.Add(1)
if strings.HasSuffix(entry.Path, ".wsp") {
metricsKnown++
metricsKnown.Add(1)
}
}
if err := flc.Close(); err != nil {
Expand Down Expand Up @@ -1012,12 +1015,25 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
logger.Error("can't index symlink data dir", zap.String("path", dir))
}

err := filepath.Walk(dir, func(p string, info os.FileInfo, err error) error {
fastwalkConf := fastwalk.Config{
Follow: false, // do not follow symlinks
// numWorkers default is sane here (>=4, but <=32)
}
// please note that fastwalk.Walk function should be concurrently safe
// TODO: refactor this
// e.g. we can construct filesList concurrently and update index from fileList cache single threaded later
err := fastwalk.Walk(&fastwalkConf, dir, func(p string, d fs.DirEntry, err error) error {
if err != nil {
logger.Info("error processing", zap.String("path", p), zap.Error(err))
return nil
}

// getting file info from direntry
info, ierr := d.Info()
if ierr != nil {
logger.Info("error processing", zap.String("path", p), zap.Error(err))
return nil
}
// WHY: as filepath.walk could potentially taking a long
// time to complete (>= 5 minutes or more), depending
// on how many files are there on disk. It's nice to
Expand All @@ -1039,7 +1055,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
//
// TODO: only trigger enter the loop when it's half full?
// len(listener.newMetricsChan) >= cap(listener.newMetricsChan)/2
if listener.trieIndex && listener.concurrentIndex && listener.newMetricsChan != nil {
if listener.trieIndex && listener.concurrentIndex && listener.newMetricsChan != nil && len(listener.newMetricsChan) >= cap(listener.newMetricsChan)/2 {
logger.Info("trying to flush incoming metrics into index")
newMetricsLoop:
for {
select {
Expand All @@ -1054,10 +1071,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}
}

logger.Info("flush of incoming metrics done, proceeding with file list populating")
isFullMetric := strings.HasSuffix(info.Name(), ".wsp")
if info.IsDir() || isFullMetric { // both dir and metric file is needed for supporting trigram index.
trimmedName := strings.TrimPrefix(p, listener.whisperData)
filesLen++
filesLen.Add(1)

var dataPoints, logicalSize, physicalSize int64
if isFullMetric {
Expand Down Expand Up @@ -1093,11 +1111,14 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}
}
} else {
// we're in fastwalk goroutine
filesMutex.Lock()
defer filesMutex.Unlock()
files = append(files, trimmedName)
}

if isFullMetric {
metricsKnown++
metricsKnown.Add(1)
}
}

Expand Down Expand Up @@ -1138,6 +1159,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
)
}
}
logger.Info("filewalk is done, proceeding with index update")

if listener.concurrentIndex && trieIdx != nil {
trieIdx.prune()
Expand All @@ -1160,7 +1182,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
totalSpace := stat.Blocks * uint64(stat.Bsize)

fileScanRuntime := time.Since(t0)
atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown)
atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown.Load())
atomic.AddUint64(&listener.metrics.FileScanTimeNS, uint64(fileScanRuntime.Nanoseconds()))

nfidx := &fileIndex{
Expand Down Expand Up @@ -1226,12 +1248,12 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
zap.Duration("rdtime_update_runtime", rdTimeUpdateRuntime),
zap.Duration("cache_index_runtime", cacheIndexRuntime),
zap.Duration("total_runtime", time.Since(t0)),
zap.Int("Files", filesLen),
zap.Uint64("Files", filesLen.Load()),
zap.Int("index_size", indexSize),
zap.Int("pruned_trigrams", pruned),
zap.Int("cache_metric_len_before", cacheMetricLen),
zap.Int("cache_metric_len_after", len(cacheMetricNames)),
zap.Uint64("metrics_known", metricsKnown),
zap.Uint64("metrics_known", metricsKnown.Load()),
zap.String("index_type", indexType),
zap.Bool("read_from_cache", readFromCache),
)
Expand Down
6 changes: 6 additions & 0 deletions carbonserver/flc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"os"
"strings"
"sync"
)

// file list cache
Expand Down Expand Up @@ -82,6 +83,7 @@ type fileListCacheCommon struct {
path string
mode byte
file *os.File
mutex sync.Mutex
reader *gzip.Reader
writer *gzip.Writer
}
Expand Down Expand Up @@ -237,6 +239,8 @@ func newFileListCacheV1ReadOnly(flcc *fileListCacheCommon) *fileListCacheV1 {
}

func (flc *fileListCacheV1) Write(entry *FLCEntry) error {
flc.mutex.Lock()
defer flc.mutex.Unlock()
_, err := flc.writer.Write([]byte(entry.Path + "\n"))
return err
}
Expand Down Expand Up @@ -300,6 +304,8 @@ func (flc *fileListCacheV2) Write(entry *FLCEntry) error {

buf[offset] = '\n'

flc.mutex.Lock()
defer flc.mutex.Unlock()
_, err := flc.writer.Write(buf)

return err
Expand Down
3 changes: 3 additions & 0 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ type trieIndex struct {
fileCount int
depth uint64
longestMetric string
mutex sync.Mutex

// qau: Quota And Usage
qauMetrics []points.Points
Expand Down Expand Up @@ -492,6 +493,8 @@ func (t *trieInsertError) Error() string { return t.typ }
//
// insert returns either a file node or dir node, after inserted.
func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints, firstSeenAt int64) (*trieNode, error) {
ti.mutex.Lock()
defer ti.mutex.Unlock()
path = filepath.Clean(path)
if len(path) > 0 && path[0] == '/' { // skipcq: GO-S1005
path = path[1:]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
)

require (
github.com/charlievieth/fastwalk v1.0.3
golang.org/x/net v0.25.0
google.golang.org/protobuf v1.34.1
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charlievieth/fastwalk v1.0.3 h1:eNWFaNPe5srPqQ5yyDbhAf11paeZaHWcihRhpuYFfSg=
github.com/charlievieth/fastwalk v1.0.3/go.mod h1:JSfglY/gmL/rqsUS1NCsJTocB5n6sSl9ApAqif4CUbs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
2 changes: 2 additions & 0 deletions vendor/github.com/charlievieth/fastwalk/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/charlievieth/fastwalk/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions vendor/github.com/charlievieth/fastwalk/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading