From 5e84431a214ead086060f9f92b615e8502e1f9d6 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 19 Oct 2020 07:32:46 -0400 Subject: [PATCH] [dbnode] Add ability to configure concurrent building of index segments at bootstrap (#2620) --- src/cmd/services/m3dbnode/config/bootstrap.go | 26 +-- .../services/m3dbnode/config/config_test.go | 1 + .../peers_bootstrap_high_concurrency_test.go | 26 +-- .../bootstrap/bootstrapper/fs/options.go | 63 ++++--- .../bootstrap/bootstrapper/fs/source.go | 94 ++++++++-- .../bootstrapper/fs/source_data_test.go | 6 +- .../bootstrap/bootstrapper/fs/types.go | 20 +-- .../bootstrap/bootstrapper/peers/options.go | 18 ++ .../bootstrap/bootstrapper/peers/source.go | 164 +++++++++++------- .../bootstrap/bootstrapper/peers/types.go | 8 + src/x/config/config.go | 3 +- src/x/config/config_test.go | 68 ++++++-- 12 files changed, 332 insertions(+), 165 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index c0036d5f21..f1a1545616 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -22,8 +22,6 @@ package config import ( "fmt" - "math" - "runtime" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/persist/fs" @@ -65,22 +63,24 @@ type BootstrapConfiguration struct { // CacheSeriesMetadata determines whether individual bootstrappers cache // series metadata across all calls (namespaces / shards / blocks). CacheSeriesMetadata *bool `yaml:"cacheSeriesMetadata"` + + // IndexSegmentConcurrency determines the concurrency for building index + // segments. + IndexSegmentConcurrency *int `yaml:"indexSegmentConcurrency"` } // BootstrapFilesystemConfiguration specifies config for the fs bootstrapper. type BootstrapFilesystemConfiguration struct { - // NumProcessorsPerCPU is the number of processors per CPU. - NumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"` + // DeprecatedNumProcessorsPerCPU is the number of processors per CPU. + // TODO: Remove, this is deprecated since BootstrapDataNumProcessors() is + // no longer actually used anywhere. + DeprecatedNumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"` // Migration configuration specifies what version, if any, existing data filesets should be migrated to // if necessary. Migration *BootstrapMigrationConfiguration `yaml:"migration"` } -func (c BootstrapFilesystemConfiguration) numCPUs() int { - return int(math.Ceil(float64(c.NumProcessorsPerCPU * float64(runtime.NumCPU())))) -} - func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfiguration { if cfg := c.Migration; cfg != nil { return *cfg @@ -90,8 +90,7 @@ func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfigur func newDefaultBootstrapFilesystemConfiguration() BootstrapFilesystemConfiguration { return BootstrapFilesystemConfiguration{ - NumProcessorsPerCPU: defaultNumProcessorsPerCPU, - Migration: &BootstrapMigrationConfiguration{}, + Migration: &BootstrapMigrationConfiguration{}, } } @@ -219,11 +218,13 @@ func (bsc BootstrapConfiguration) New( SetIndexOptions(opts.IndexOptions()). SetPersistManager(opts.PersistManager()). SetCompactor(compactor). - SetBoostrapDataNumProcessors(fsCfg.numCPUs()). SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetIdentifierPool(opts.IdentifierPool()). SetMigrationOptions(fsCfg.migration().NewOptions()). SetStorageOptions(opts) + if v := bsc.IndexSegmentConcurrency; v != nil { + fsbOpts = fsbOpts.SetIndexSegmentConcurrency(*v) + } if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil { return nil, err } @@ -263,6 +264,9 @@ func (bsc BootstrapConfiguration) New( SetDefaultShardConcurrency(pCfg.StreamShardConcurrency). SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency). SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency) + if v := bsc.IndexSegmentConcurrency; v != nil { + pOpts = pOpts.SetIndexSegmentConcurrency(*v) + } if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil { return nil, err } diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 9248a878f4..bfc1351b54 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -428,6 +428,7 @@ func TestConfiguration(t *testing.T) { returnUnfulfilledForCorruptCommitLogFiles: false peers: null cacheSeriesMetadata: null + indexSegmentConcurrency: null blockRetrieve: null cache: series: null diff --git a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go index 30017818af..0394153c95 100644 --- a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go +++ b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go @@ -23,7 +23,6 @@ package integration import ( - "encoding/json" "fmt" "testing" "time" @@ -40,17 +39,22 @@ import ( "go.uber.org/zap" ) -func TestPeersBootstrapHighConcurrency(t *testing.T) { - for _, test := range []testPeersBootstrapHighConcurrencyOptions{ - {BatchSize: 16, Concurrency: 64, BatchesPerWorker: 8}, - {BatchSize: 64, Concurrency: 16, BatchesPerWorker: 8}, - } { - name, err := json.Marshal(test) - require.NoError(t, err) - t.Run(string(name), func(t *testing.T) { - testPeersBootstrapHighConcurrency(t, test) +func TestPeersBootstrapHighConcurrencyBatch16Workers64(t *testing.T) { + testPeersBootstrapHighConcurrency(t, + testPeersBootstrapHighConcurrencyOptions{ + BatchSize: 16, + Concurrency: 64, + BatchesPerWorker: 8, + }) +} + +func TestPeersBootstrapHighConcurrencyBatch64Workers16(t *testing.T) { + testPeersBootstrapHighConcurrency(t, + testPeersBootstrapHighConcurrencyOptions{ + BatchSize: 64, + Concurrency: 16, + BatchesPerWorker: 8, }) - } } type testPeersBootstrapHighConcurrencyOptions struct { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go index f5b67f8473..89e4787aa2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go @@ -22,6 +22,7 @@ package fs import ( "errors" + "fmt" "math" goruntime "runtime" @@ -56,21 +57,23 @@ var ( // us splitting an index block into smaller pieces is moot because we'll // pull a lot more data into memory if we create more than one at a time. defaultBootstrapIndexNumProcessors = 1 + + // defaultIndexSegmentConcurrency defines the default index segment building concurrency. + defaultIndexSegmentConcurrency = 1 ) type options struct { - instrumentOpts instrument.Options - resultOpts result.Options - fsOpts fs.Options - indexOpts index.Options - persistManager persist.Manager - compactor *compaction.Compactor - bootstrapDataNumProcessors int - bootstrapIndexNumProcessors int - runtimeOptsMgr runtime.OptionsManager - identifierPool ident.Pool - migrationOpts migration.Options - storageOpts storage.Options + instrumentOpts instrument.Options + resultOpts result.Options + fsOpts fs.Options + indexOpts index.Options + persistManager persist.Manager + compactor *compaction.Compactor + indexSegmentConcurrency int + runtimeOptsMgr runtime.OptionsManager + identifierPool ident.Pool + migrationOpts migration.Options + storageOpts storage.Options } // NewOptions creates new bootstrap options @@ -82,14 +85,13 @@ func NewOptions() Options { idPool := ident.NewPool(bytesPool, ident.PoolOptions{}) return &options{ - instrumentOpts: instrument.NewOptions(), - resultOpts: result.NewOptions(), - bootstrapDataNumProcessors: defaultBootstrapDataNumProcessors, - bootstrapIndexNumProcessors: defaultBootstrapIndexNumProcessors, - runtimeOptsMgr: runtime.NewOptionsManager(), - identifierPool: idPool, - migrationOpts: migration.NewOptions(), - storageOpts: storage.NewOptions(), + instrumentOpts: instrument.NewOptions(), + resultOpts: result.NewOptions(), + indexSegmentConcurrency: defaultIndexSegmentConcurrency, + runtimeOptsMgr: runtime.NewOptionsManager(), + identifierPool: idPool, + migrationOpts: migration.NewOptions(), + storageOpts: storage.NewOptions(), } } @@ -112,6 +114,9 @@ func (o *options) Validate() error { if err := o.migrationOpts.Validate(); err != nil { return err } + if n := o.indexSegmentConcurrency; n <= 0 { + return fmt.Errorf("index segment concurrency not >= 1: actual=%d", n) + } return nil } @@ -175,24 +180,14 @@ func (o *options) Compactor() *compaction.Compactor { return o.compactor } -func (o *options) SetBoostrapDataNumProcessors(value int) Options { - opts := *o - opts.bootstrapDataNumProcessors = value - return &opts -} - -func (o *options) BoostrapDataNumProcessors() int { - return o.bootstrapDataNumProcessors -} - -func (o *options) SetBoostrapIndexNumProcessors(value int) Options { +func (o *options) SetIndexSegmentConcurrency(value int) Options { opts := *o - opts.bootstrapIndexNumProcessors = value + opts.indexSegmentConcurrency = value return &opts } -func (o *options) BoostrapIndexNumProcessors() int { - return o.bootstrapIndexNumProcessors +func (o *options) IndexSegmentConcurrency() int { + return o.indexSegmentConcurrency } func (o *options) SetRuntimeOptionsManager(value runtime.OptionsManager) Options { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 6bd65b8bcf..07943e89d2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -37,11 +37,13 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" @@ -77,8 +79,6 @@ type fileSystemSource struct { idPool ident.Pool newReaderFn newDataFileSetReaderFn newReaderPoolOpts bootstrapper.NewReaderPoolOptions - persistManager *bootstrapper.SharedPersistManager - compactor *bootstrapper.SharedCompactor metrics fileSystemSourceMetrics } @@ -104,12 +104,6 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { nowFn: opts.ResultOptions().ClockOptions().NowFn(), idPool: opts.IdentifierPool(), newReaderFn: fs.NewReader, - persistManager: &bootstrapper.SharedPersistManager{ - Mgr: opts.PersistManager(), - }, - compactor: &bootstrapper.SharedCompactor{ - Compactor: opts.Compactor(), - }, metrics: fileSystemSourceMetrics{ persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), @@ -328,12 +322,14 @@ func (s *fileSystemSource) bootstrapFromReaders( ns namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, runOpts bootstrap.RunOptions, + runResult *runResult, readerPool *bootstrapper.ReaderPool, readersCh <-chan bootstrapper.TimeWindowReaders, builder *result.IndexBuilder, -) *runResult { + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, +) { var ( - runResult = newRunResult() resultOpts = s.opts.ResultOptions() ) @@ -343,10 +339,9 @@ func (s *fileSystemSource) bootstrapFromReaders( builder.Builder().Reset() s.loadShardReadersDataIntoShardResult(run, ns, accumulator, - runOpts, runResult, resultOpts, timeWindowReaders, readerPool, builder) + runOpts, runResult, resultOpts, timeWindowReaders, readerPool, + builder, persistManager, compactor) } - - return runResult } // markRunResultErrorsAndUnfulfilled checks the list of times that had errors and makes @@ -397,6 +392,8 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( timeWindowReaders bootstrapper.TimeWindowReaders, readerPool *bootstrapper.ReaderPool, builder *result.IndexBuilder, + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, ) { var ( blockPool = ropts.DatabaseBlockOptions().DatabaseBlockPool() @@ -486,9 +483,11 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( if err == nil && run == bootstrapIndexRunType { // Mark index block as fulfilled. fulfilled := result.NewShardTimeRanges().Set(shard, xtime.NewRanges(timeRange)) + runResult.Lock() err = runResult.index.IndexResults().MarkFulfilled(start, fulfilled, // NB(bodu): By default, we always load bootstrapped data into the default index volume. idxpersist.DefaultIndexVolumeType, ns.Options().IndexOptions()) + runResult.Unlock() if err != nil { s.log.Error("indexResults MarkFulfilled failed", zap.Error(err), zap.Time("timeRangeStart", timeRange.Start)) @@ -542,7 +541,9 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( fulfilledMin, fulfilledMax := totalFulfilledRanges.MinMax() // NB(bodu): Assume if we're bootstrapping data from disk that it is the "default" index volume type. + runResult.Lock() existingIndexBlock, ok := bootstrapper.GetDefaultIndexBlockForBlockStart(runResult.index.IndexResults(), blockStart) + runResult.Unlock() if !ok { err := fmt.Errorf("could not find index block in results: time=%s, ts=%d", blockStart.String(), blockStart.UnixNano()) @@ -584,7 +585,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( ns, requestedRanges, builder.Builder(), - s.persistManager, + persistManager, s.opts.ResultOptions(), existingIndexBlock.Fulfilled(), blockStart, @@ -606,7 +607,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( ns, requestedRanges, builder.Builder(), - s.compactor, + compactor, s.opts.ResultOptions(), s.opts.FilesystemOptions().MmapReporter(), blockStart, @@ -632,7 +633,9 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( newFulfilled.AddRanges(indexBlock.Fulfilled()) // Replace index block for default index volume type. + runResult.Lock() runResult.index.IndexResults()[xtime.ToUnixNano(blockStart)].SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(segments, newFulfilled)) + runResult.Unlock() } // Return readers to pool. @@ -800,7 +803,8 @@ func (s *fileSystemSource) read( // allocate and keep around readers outside of the bootstrapping process, // hence why its created on demand each time. readerPool := bootstrapper.NewReaderPool(s.newReaderPoolOpts) - readersCh := make(chan bootstrapper.TimeWindowReaders) + indexSegmentConcurrency := s.opts.IndexSegmentConcurrency() + readersCh := make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) var blockSize time.Duration switch run { case bootstrapDataRunType: @@ -828,11 +832,56 @@ func (s *fileSystemSource) read( NowFn: s.nowFn, Cache: cache, }) - bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md, - accumulator, runOpts, readerPool, readersCh, builder) + + bootstrapFromReadersRunResult := newRunResult() + + var buildWg sync.WaitGroup + for i := 0; i < indexSegmentConcurrency; i++ { + alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() + segBuilder, err := alloc() + if err != nil { + return nil, err + } + + builder := result.NewIndexBuilder(segBuilder) + + indexOpts := s.opts.IndexOptions() + compactor, err := compaction.NewCompactor(indexOpts.DocumentArrayPool(), + index.DocumentArrayPoolCapacity, + indexOpts.SegmentBuilderOptions(), + indexOpts.FSTSegmentOptions(), + compaction.CompactorOptions{ + FSTWriterOptions: &fst.WriterOptions{ + // DisableRegistry is set to true to trade a larger FST size + // for a faster FST compaction since we want to reduce the end + // to end latency for time to first index a metric. + DisableRegistry: true, + }, + }) + if err != nil { + return nil, err + } + + persistManager, err := fs.NewPersistManager(s.opts.FilesystemOptions()) + if err != nil { + return nil, err + } + + buildWg.Add(1) + go func() { + s.bootstrapFromReaders(run, md, + accumulator, runOpts, bootstrapFromReadersRunResult, + readerPool, readersCh, builder, + &bootstrapper.SharedPersistManager{Mgr: persistManager}, + &bootstrapper.SharedCompactor{Compactor: compactor}) + buildWg.Done() + }() + } + + buildWg.Wait() // Merge any existing results if necessary. - setOrMergeResult(bootstrapFromDataReadersResult) + setOrMergeResult(bootstrapFromReadersRunResult) return res, nil } @@ -847,6 +896,7 @@ func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( shardTimeRanges result.ShardTimeRanges, cache bootstrap.Cache, ) (*runResult, error) { + // No locking required, all local to this fn until returned. runResult := newRunResult() unfulfilled := runResult.data.Unfulfilled() for shard, ranges := range shardTimeRanges.Iter() { @@ -1004,6 +1054,12 @@ func (r *runResult) addIndexBlockIfNotExists( } func (r *runResult) mergedResult(other *runResult) *runResult { + r.Lock() + defer r.Unlock() + + other.Lock() + defer other.Unlock() + return &runResult{ data: result.MergedDataBootstrapResult(r.data, other.data), index: result.MergedIndexBootstrapResult(r.index, other.index), diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 363624b88a..f67c75c82c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -73,10 +73,8 @@ var ( testDefaultRunOpts = bootstrap.NewRunOptions(). SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) - testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts). - SetBoostrapDataNumProcessors(1). - SetBoostrapIndexNumProcessors(1) - testShardRanges = testShardTimeRanges() + testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts) + testShardRanges = testShardTimeRanges() ) func newTestOptions(t require.TestingT, filePathPrefix string) Options { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go index 2720684851..778f133b52 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go @@ -70,21 +70,13 @@ type Options interface { // Compactor returns the compactor used to compact segment builders into segments. Compactor() *compaction.Compactor - // SetBoostrapDataNumProcessors sets the number of processors for CPU-bound - // work for bootstrapping data file sets. - SetBoostrapDataNumProcessors(value int) Options + // SetIndexSegmentConcurrency sets the concurrency for + // building index segments. + SetIndexSegmentConcurrency(value int) Options - // BoostrapDataNumProcessors returns the number of processors for CPU-bound - // work for bootstrapping data file sets. - BoostrapDataNumProcessors() int - - // SetBoostrapIndexNumProcessors sets the number of processors for CPU-bound - // work for bootstrapping data file sets. - SetBoostrapIndexNumProcessors(value int) Options - - // BoostrapIndexNumProcessors returns the number of processors for CPU-bound - // work for bootstrapping data file sets. - BoostrapIndexNumProcessors() int + // IndexSegmentConcurrency returns the concurrency for + // building index segments. + IndexSegmentConcurrency() int // SetRuntimeOptionsManager sets the runtime options manager. SetRuntimeOptionsManager(value runtime.OptionsManager) Options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index c3e3a595ef..cf7856d7d2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -22,6 +22,7 @@ package peers import ( "errors" + "fmt" "math" "runtime" @@ -53,6 +54,8 @@ var ( // Update BootstrapPeersConfiguration comment in // src/cmd/services/m3dbnode/config package if this is changed. DefaultShardPersistenceFlushConcurrency = 1 + // defaultIndexSegmentConcurrency defines the default index segment building concurrency. + defaultIndexSegmentConcurrency = 1 ) var ( @@ -70,6 +73,7 @@ type options struct { defaultShardConcurrency int shardPersistenceConcurrency int shardPersistenceFlushConcurrency int + indexSegmentConcurrency int persistenceMaxQueueSize int persistManager persist.Manager runtimeOptionsManager m3dbruntime.OptionsManager @@ -86,6 +90,7 @@ func NewOptions() Options { defaultShardConcurrency: DefaultShardConcurrency, shardPersistenceConcurrency: DefaultShardPersistenceConcurrency, shardPersistenceFlushConcurrency: DefaultShardPersistenceFlushConcurrency, + indexSegmentConcurrency: defaultIndexSegmentConcurrency, persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, // Use a zero pool, this should be overriden at config time. contextPool: context.NewPool(context.NewOptions(). @@ -113,6 +118,9 @@ func (o *options) Validate() error { if o.fsOpts == nil { return errFilesystemOptionsNotSet } + if n := o.indexSegmentConcurrency; n <= 0 { + return fmt.Errorf("index segment concurrency not >= 1: actual=%d", n) + } return nil } @@ -166,6 +174,16 @@ func (o *options) ShardPersistenceFlushConcurrency() int { return o.shardPersistenceFlushConcurrency } +func (o *options) SetIndexSegmentConcurrency(value int) Options { + opts := *o + opts.indexSegmentConcurrency = value + return &opts +} + +func (o *options) IndexSegmentConcurrency() int { + return o.indexSegmentConcurrency +} + func (o *options) SetPersistenceMaxQueueSize(value int) Options { opts := *o opts.persistenceMaxQueueSize = value diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index a737a9c6c0..fd6fd7482c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -37,11 +37,13 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" @@ -60,8 +62,6 @@ type peersSource struct { log *zap.Logger newPersistManager func() (persist.Manager, error) nowFn clock.NowFn - persistManager *bootstrapper.SharedPersistManager - compactor *bootstrapper.SharedCompactor } type persistenceFlush struct { @@ -84,12 +84,6 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, nowFn: opts.ResultOptions().ClockOptions().NowFn(), - persistManager: &bootstrapper.SharedPersistManager{ - Mgr: opts.PersistManager(), - }, - compactor: &bootstrapper.SharedCompactor{ - Compactor: opts.Compactor(), - }, }, nil } @@ -177,13 +171,6 @@ func (s *peersSource) Read( zap.Duration("took", s.nowFn().Sub(start))) span.LogEvent("bootstrap_data_done") - alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() - segBuilder, err := alloc() - if err != nil { - return bootstrap.NamespaceResults{}, err - } - builder := result.NewIndexBuilder(segBuilder) - start = s.nowFn() s.log.Info("bootstrapping index metadata start") span.LogEvent("bootstrap_index_start") @@ -200,7 +187,6 @@ func (s *peersSource) Read( r, err := s.readIndex(md, namespace.IndexRunOptions.ShardTimeRanges, - builder, span, cache, namespace.IndexRunOptions.RunOptions, @@ -374,7 +360,6 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( // If performing a bootstrap with persistence enabled then flush one // at a time as shard results are gathered. - fmt.Printf("!! starting persist worker\n") for flush := range persistenceQueue { err := s.flush(opts, persistFlush, flush.nsMetadata, flush.shard, flush.shardResult, flush.timeRange, asyncTasks) @@ -396,7 +381,6 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( bootstrapResult.SetUnfulfilled(unfulfilled) lock.Unlock() } - fmt.Printf("!! finishing persist worker\n") } // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and @@ -565,7 +549,6 @@ func (s *peersSource) flush( ropts = nsMetadata.Options().RetentionOptions() blockSize = ropts.BlockSize() ) - fmt.Printf("!! flushing shard=%d, tr=%s\n", shard, tr.String()) for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, @@ -597,7 +580,6 @@ func (s *peersSource) flush( // so it is safe to delete on disk data. DeleteIfExists: true, } - fmt.Printf("!! prepare data shard=%d, start=%s\n", shard, start.String()) prepared, err := flush.PrepareData(prepareOpts) if err != nil { return err @@ -606,17 +588,14 @@ func (s *peersSource) flush( var blockErr error for _, entry := range shardResult.AllSeries().Iter() { s := entry.Value() - fmt.Printf("!! trying persist id=%s\n", s.ID.String()) bl, ok := s.Blocks.BlockAt(start) if !ok { - fmt.Printf("!! block at missing start=%s\n", start) continue } checksum, err := bl.Checksum() if err != nil { blockErr = err // Need to call prepared.Close, avoid return - fmt.Printf("!! block err err=%v\n", err) break } @@ -626,7 +605,6 @@ func (s *peersSource) flush( // Remove from map. s.Blocks.RemoveBlockAt(start) - fmt.Printf("!! REALLY trying persist id=%s, start=%s\n", s.ID.String(), start.String()) metadata := persist.NewMetadataFromIDAndTags(s.ID, s.Tags, persist.MetadataOptions{}) err = prepared.Persist(metadata, segment, checksum) @@ -695,7 +673,6 @@ func (s *peersSource) flush( func (s *peersSource) readIndex( ns namespace.Metadata, shardTimeRanges result.ShardTimeRanges, - builder *result.IndexBuilder, span opentracing.Span, cache bootstrap.Cache, opts bootstrap.RunOptions, @@ -723,8 +700,9 @@ func (s *peersSource) readIndex( return fs.NewReader(bytesPool, fsOpts) }, }) - resultLock = &sync.Mutex{} - readersCh = make(chan bootstrapper.TimeWindowReaders) + resultLock = &sync.Mutex{} + indexSegmentConcurrency = s.opts.IndexSegmentConcurrency() + readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) ) s.log.Info("peers bootstrapper bootstrapping index for ranges", zap.Int("shards", count), @@ -748,6 +726,64 @@ func (s *peersSource) readIndex( Cache: cache, }) + var buildWg sync.WaitGroup + for i := 0; i < indexSegmentConcurrency; i++ { + alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() + segBuilder, err := alloc() + if err != nil { + return nil, err + } + + builder := result.NewIndexBuilder(segBuilder) + + indexOpts := s.opts.IndexOptions() + compactor, err := compaction.NewCompactor(indexOpts.DocumentArrayPool(), + index.DocumentArrayPoolCapacity, + indexOpts.SegmentBuilderOptions(), + indexOpts.FSTSegmentOptions(), + compaction.CompactorOptions{ + FSTWriterOptions: &fst.WriterOptions{ + // DisableRegistry is set to true to trade a larger FST size + // for a faster FST compaction since we want to reduce the end + // to end latency for time to first index a metric. + DisableRegistry: true, + }, + }) + if err != nil { + return nil, err + } + + persistManager, err := s.newPersistManager() + if err != nil { + return nil, err + } + + buildWg.Add(1) + go func() { + s.processReadersWorker(ns, r, readersCh, builder, readerPool, idxOpts, + &bootstrapper.SharedPersistManager{Mgr: persistManager}, + &bootstrapper.SharedCompactor{Compactor: compactor}, + resultLock) + buildWg.Done() + }() + } + + buildWg.Wait() + + return r, nil +} + +func (s *peersSource) processReadersWorker( + ns namespace.Metadata, + r result.IndexBootstrapResult, + readersCh <-chan bootstrapper.TimeWindowReaders, + builder *result.IndexBuilder, + readerPool *bootstrapper.ReaderPool, + idxOpts namespace.IndexOptions, + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, + resultLock *sync.Mutex, +) { for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, // it is not thread safe and requires reset after every processed index block. @@ -761,40 +797,13 @@ func (s *peersSource) readIndex( timeWindowReaders, readerPool, idxOpts, + persistManager, + compactor, + resultLock, ) s.markRunResultErrorsAndUnfulfilled(resultLock, r, timeWindowReaders.Ranges, remainingRanges, timesWithErrors) } - - return r, nil -} - -func (s *peersSource) readNextEntryAndMaybeIndex( - r fs.DataFileSetReader, - batch []doc.Document, - builder *result.IndexBuilder, -) ([]doc.Document, error) { - // If performing index run, then simply read the metadata and add to segment. - id, tagsIter, _, _, err := r.ReadMetadata() - if err != nil { - return batch, err - } - - d, err := convert.FromSeriesIDAndTagIter(id, tagsIter) - // Finalize the ID and tags. - id.Finalize() - tagsIter.Close() - if err != nil { - return batch, err - } - - batch = append(batch, d) - - if len(batch) >= index.DocumentArrayPoolCapacity { - return builder.FlushBatch(batch) - } - - return batch, nil } func (s *peersSource) processReaders( @@ -804,6 +813,9 @@ func (s *peersSource) processReaders( timeWindowReaders bootstrapper.TimeWindowReaders, readerPool *bootstrapper.ReaderPool, idxOpts namespace.IndexOptions, + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, + resultLock *sync.Mutex, ) (result.ShardTimeRanges, []time.Time) { var ( docsPool = s.opts.IndexOptions().DocumentArrayPool() @@ -826,7 +838,9 @@ func (s *peersSource) processReaders( err error ) + resultLock.Lock() r.IndexResults().AddBlockIfNotExists(start, idxOpts) + resultLock.Unlock() numEntries := reader.Entries() for i := 0; err == nil && i < numEntries; i++ { batch, err = s.readNextEntryAndMaybeIndex(reader, batch, builder) @@ -849,9 +863,11 @@ func (s *peersSource) processReaders( shard, xtime.NewRanges(timeRange), ) + resultLock.Lock() err = r.IndexResults().MarkFulfilled(start, fulfilled, // NB(bodu): By default, we always load bootstrapped data into the default index volume. idxpersist.DefaultIndexVolumeType, idxOpts) + resultLock.Unlock() } if err == nil { @@ -910,7 +926,7 @@ func (s *peersSource) processReaders( ns, requestedRanges, builder.Builder(), - s.persistManager, + persistManager, s.opts.ResultOptions(), existingIndexBlock.Fulfilled(), blockStart, @@ -930,7 +946,7 @@ func (s *peersSource) processReaders( ns, requestedRanges, builder.Builder(), - s.compactor, + compactor, s.opts.ResultOptions(), s.opts.IndexOptions().MmapReporter(), blockStart, @@ -956,7 +972,9 @@ func (s *peersSource) processReaders( newFulfilled.AddRanges(indexBlock.Fulfilled()) // Replace index block for default index volume type. + resultLock.Lock() r.IndexResults()[xtime.ToUnixNano(blockStart)].SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(segments, newFulfilled)) + resultLock.Unlock() // Return readers to pool. for _, shardReaders := range timeWindowReaders.Readers { @@ -970,6 +988,34 @@ func (s *peersSource) processReaders( return remainingRanges, timesWithErrors } +func (s *peersSource) readNextEntryAndMaybeIndex( + r fs.DataFileSetReader, + batch []doc.Document, + builder *result.IndexBuilder, +) ([]doc.Document, error) { + // If performing index run, then simply read the metadata and add to segment. + id, tagsIter, _, _, err := r.ReadMetadata() + if err != nil { + return batch, err + } + + d, err := convert.FromSeriesIDAndTagIter(id, tagsIter) + // Finalize the ID and tags. + id.Finalize() + tagsIter.Close() + if err != nil { + return batch, err + } + + batch = append(batch, d) + + if len(batch) >= index.DocumentArrayPoolCapacity { + return builder.FlushBatch(batch) + } + + return batch, nil +} + // markRunResultErrorsAndUnfulfilled checks the list of times that had errors and makes // sure that we don't return any blocks or bloom filters for them. In addition, // it looks at any remaining (unfulfilled) ranges and makes sure they're marked diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 33ebd5dcfe..7cf05611db 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -78,6 +78,14 @@ type Options interface { // persistence enabled. ShardPersistenceFlushConcurrency() int + // SetIndexSegmentConcurrency sets the concurrency for + // building index segments. + SetIndexSegmentConcurrency(value int) Options + + // IndexSegmentConcurrency returns the concurrency for + // building index segments. + IndexSegmentConcurrency() int + // SetPersistenceMaxQueueSize sets the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. diff --git a/src/x/config/config.go b/src/x/config/config.go index 6cbfe2ee77..bbd2f54b6b 100644 --- a/src/x/config/config.go +++ b/src/x/config/config.go @@ -117,7 +117,8 @@ func deprecationCheck(cfg interface{}, df []string) []string { df = deprecationCheck(v.Interface(), df) } name := reflect.TypeOf(cfg).Field(i).Name - if strings.HasPrefix(name, deprecatedPrefix) { + if strings.HasPrefix(name, deprecatedPrefix) && !v.IsZero() { + // Only log as deprecated if actually using (not zero value). df = append(df, name) } } diff --git a/src/x/config/config_test.go b/src/x/config/config_test.go index ed5f7e4c47..195ea505de 100644 --- a/src/x/config/config_test.go +++ b/src/x/config/config_test.go @@ -75,6 +75,7 @@ type configurationDeprecated struct { Servers []string `validate:"nonzero"` DeprecatedFoo string `yaml:"foo"` DeprecatedBar int `yaml:"bar"` + DeprecatedBaz *int `yaml:"baz"` } type nestedConfigurationDeprecated struct { @@ -441,9 +442,54 @@ bar: 42 require.NoError(t, err) actual := deprecationCheck(cfg2, df) - require.Len(t, actual, 2) expect := []string{"DeprecatedFoo", "DeprecatedBar"} - require.Equal(t, expect, actual) + require.Equal(t, len(expect), len(actual), + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) + require.Equal(t, expect, actual, + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) + }) + + t.Run("DeprecatedZeroValue", func(t *testing.T) { + // OK + var cfg configuration + fname := writeFile(t, goodConfig) + defer func() { + require.NoError(t, os.Remove(fname)) + }() + + err := LoadFile(&cfg, fname, Options{}) + require.NoError(t, err) + + df := []string{} + ss := deprecationCheck(cfg, df) + require.Equal(t, 0, len(ss)) + + // Deprecated zero value should be ok and not printed + badConfig := ` +listen_address: localhost:4385 +buffer_space: 1024 +servers: + - server1:8090 + - server2:8010 +foo: ok +bar: 42 +baz: null +` + var cfg2 configurationDeprecated + fname2 := writeFile(t, badConfig) + defer func() { + require.NoError(t, os.Remove(fname2)) + }() + + err = LoadFile(&cfg2, fname2, Options{}) + require.NoError(t, err) + + actual := deprecationCheck(cfg2, df) + expect := []string{"DeprecatedFoo", "DeprecatedBar"} + require.Equal(t, len(expect), len(actual), + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) + require.Equal(t, expect, actual, + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) }) t.Run("NestedConfig", func(t *testing.T) { @@ -470,9 +516,11 @@ commitlog: df := []string{} actual := deprecationCheck(cfg, df) - require.Len(t, actual, 1) expect := []string{"DeprecatedBlockSize"} - require.Equal(t, expect, actual) + require.Equal(t, len(expect), len(actual), + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) + require.Equal(t, expect, actual, + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) // Multiple deprecation var cfg2 nestedConfigurationMultipleDeprecated @@ -485,7 +533,6 @@ servers: commitlog: flushMaxBytes: 42 flushEvery: second - blockSize: 23 multiple: listen_address: localhost:4385 buffer_space: 1024 @@ -506,18 +553,15 @@ multiple: df = []string{} actual = deprecationCheck(cfg2, df) - require.Len(t, actual, 4) expect = []string{ - "DeprecatedBlockSize", "DeprecatedMultiple", "DeprecatedFoo", "DeprecatedBar", } - require.True( - t, - slicesContainSameStrings(expect, actual), - fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual), - ) + require.Equal(t, len(expect), len(actual), + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) + require.True(t, slicesContainSameStrings(expect, actual), + fmt.Sprintf("expect %#v should be equal actual %#v", expect, actual)) }) }