From 2e365ae77efbae91344a6c4b346f45c9f5bd6014 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 14 Oct 2020 18:40:07 -0400 Subject: [PATCH] [dbnode] Add configuration to flush data in parallel for peer streaming (#2594) --- src/cmd/services/m3dbnode/config/bootstrap.go | 12 +- src/dbnode/client/session.go | 23 +- src/dbnode/integration/integration.go | 2 +- .../peers_bootstrap_high_concurrency_test.go | 158 +++++++++++-- .../bootstrap/bootstrapper/peers/options.go | 47 ++-- .../bootstrap/bootstrapper/peers/source.go | 220 ++++++++++-------- .../bootstrapper/peers/source_data_test.go | 74 +++--- .../bootstrapper/peers/source_index_test.go | 134 +---------- .../bootstrap/bootstrapper/peers/types.go | 10 + .../storage/bootstrap/result/result_data.go | 3 +- .../bootstrap/result/result_data_test.go | 18 +- src/dbnode/storage/repair.go | 3 +- src/dbnode/storage/shard_test.go | 2 +- 13 files changed, 382 insertions(+), 324 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index f39ef73fdd..c0036d5f21 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -143,12 +143,17 @@ type BootstrapPeersConfiguration struct { // for historical data being streamed between peers (historical blocks). // Defaults to: numCPU / 2. StreamPersistShardConcurrency int `yaml:"streamPersistShardConcurrency"` + // StreamPersistShardFlushConcurrency controls how many shards in parallel to flush + // for historical data being streamed between peers (historical blocks). + // Defaults to: 1. + StreamPersistShardFlushConcurrency int `yaml:"streamPersistShardFlushConcurrency"` } func newDefaultBootstrapPeersConfiguration() BootstrapPeersConfiguration { return BootstrapPeersConfiguration{ - StreamShardConcurrency: peers.DefaultShardConcurrency, - StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency, + StreamShardConcurrency: peers.DefaultShardConcurrency, + StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency, + StreamPersistShardFlushConcurrency: peers.DefaultShardPersistenceFlushConcurrency, } } @@ -256,7 +261,8 @@ func (bsc BootstrapConfiguration) New( SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetContextPool(opts.ContextPool()). SetDefaultShardConcurrency(pCfg.StreamShardConcurrency). - SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency) + SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency). + SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency) if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil { return nil, err } diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 091dab455d..c6cd514ea5 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -68,13 +68,11 @@ import ( ) const ( - clusterConnectWaitInterval = 10 * time.Millisecond - blocksMetadataChannelInitialCapacity = 4096 - gaugeReportInterval = 500 * time.Millisecond - blockMetadataChBufSize = 4096 - shardResultCapacity = 4096 - hostNotAvailableMinSleepInterval = 1 * time.Millisecond - hostNotAvailableMaxSleepInterval = 100 * time.Millisecond + clusterConnectWaitInterval = 10 * time.Millisecond + gaugeReportInterval = 500 * time.Millisecond + blockMetadataChBufSize = 65536 + hostNotAvailableMinSleepInterval = 1 * time.Millisecond + hostNotAvailableMaxSleepInterval = 100 * time.Millisecond ) type resultTypeEnum string @@ -2036,7 +2034,7 @@ func (s *session) fetchBlocksMetadataFromPeers( var ( metadataCh = make(chan receivedBlockMetadata, - blocksMetadataChannelInitialCapacity) + blockMetadataChBufSize) errCh = make(chan error, 1) meta = resultTypeMetadata m = s.newPeerMetadataStreamingProgressMetrics(shard, meta) @@ -3550,7 +3548,7 @@ func newBulkBlocksResult( return &bulkBlocksResult{ nsCtx: nsCtx, baseBlocksResult: newBaseBlocksResult(nsCtx, opts, resultOpts), - result: result.NewShardResult(shardResultCapacity, resultOpts), + result: result.NewShardResult(resultOpts), tagDecoderPool: tagDecoderPool, idPool: idPool, } @@ -3654,7 +3652,12 @@ type enqueueCh struct { metrics *streamFromPeersMetrics } -const enqueueChannelDefaultLen = 32768 +// enqueueChannelDefaultLen is the queue length for processing series ready to +// be fetched from other peers. +// It was reduced from 32k to 512 since each struct in the queue is quite large +// and with 32k capacity was using significant memory with high shard +// concurrency. +const enqueueChannelDefaultLen = 512 func newEnqueueChannel(m *streamFromPeersMetrics) enqueueChannel { c := &enqueueCh{ diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index f6091f7cf4..31883a58fb 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -228,7 +228,7 @@ func newDefaultBootstrappableTestSetups( SetTopologyInitializer(topologyInitializer).(client.AdminOptions). SetOrigin(origin) - // Prevent integration tests from timing out when a node is down + // Prevent integration tests from timing out when a node is down retryOpts = xretry.NewOptions(). SetInitialBackoff(1 * time.Millisecond). SetMaxRetries(1). diff --git a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go index ee60e9d78d..30017818af 100644 --- a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go +++ b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go @@ -23,6 +23,7 @@ package integration import ( + "encoding/json" "fmt" "testing" "time" @@ -30,26 +31,64 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestPeersBootstrapHighConcurrency(t *testing.T) { + for _, test := range []testPeersBootstrapHighConcurrencyOptions{ + {BatchSize: 16, Concurrency: 64, BatchesPerWorker: 8}, + {BatchSize: 64, Concurrency: 16, BatchesPerWorker: 8}, + } { + name, err := json.Marshal(test) + require.NoError(t, err) + t.Run(string(name), func(t *testing.T) { + testPeersBootstrapHighConcurrency(t, test) + }) + } +} + +type testPeersBootstrapHighConcurrencyOptions struct { + BatchSize int + Concurrency int + BatchesPerWorker int +} + +func testPeersBootstrapHighConcurrency( + t *testing.T, + testOpts testPeersBootstrapHighConcurrencyOptions, +) { if testing.Short() { t.SkipNow() } // Test setups log := xtest.NewLogger(t) - retentionOpts := retention.NewOptions(). + + blockSize := 2 * time.Hour + + idxOpts := namespace.NewIndexOptions(). + SetEnabled(true). + SetBlockSize(blockSize) + + rOpts := retention.NewOptions(). SetRetentionPeriod(6 * time.Hour). - SetBlockSize(2 * time.Hour). + SetBlockSize(blockSize). SetBufferPast(10 * time.Minute). SetBufferFuture(2 * time.Minute) - namesp, err := namespace.NewMetadata(testNamespaces[0], - namespace.NewOptions().SetRetentionOptions(retentionOpts)) + + nOpts := namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts) + + namesp, err := namespace.NewMetadata(testNamespaces[0], nOpts) require.NoError(t, err) + opts := NewTestOptions(t). SetNamespaces([]namespace.Metadata{namesp}). // Use TChannel clients for writing / reading because we want to target individual nodes at a time @@ -73,22 +112,44 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { defer closeFn() // Write test data for first node - total := 8 * batchSize * concurrency - log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency", total, batchSize, concurrency) - shardIDs := make([]string, 0, total) - for i := 0; i < total; i++ { - id := fmt.Sprintf("id.%d", i) - shardIDs = append(shardIDs, id) - } + numSeries := testOpts.BatchesPerWorker * testOpts.Concurrency * testOpts.BatchSize + log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency", + numSeries, testOpts.BatchSize, testOpts.Concurrency) now := setups[0].NowFn()() - blockSize := retentionOpts.BlockSize() - seriesMaps := generate.BlocksByStart([]generate.BlockConfig{ - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-3 * blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-2 * blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now}, - }) + commonTags := []ident.Tag{ + { + Name: ident.StringID("fruit"), + Value: ident.StringID("apple"), + }, + } + numPoints := 10 + seriesMaps := generate.BlocksByStart(blockConfigs( + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-3 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-2 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-1 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now, + }), + )) err = writeTestDataToDisk(namesp, setups[0], seriesMaps, 0) require.NoError(t, err) @@ -96,8 +157,9 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { require.NoError(t, setups[0].StartServer()) // Start the last server with peers and filesystem bootstrappers + bootstrapStart := time.Now() require.NoError(t, setups[1].StartServer()) - log.Debug("servers are now up") + log.Debug("servers are now up", zap.Duration("took", time.Since(bootstrapStart))) // Stop the servers defer func() { @@ -111,4 +173,62 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { for _, setup := range setups { verifySeriesMaps(t, setup, namesp.ID(), seriesMaps) } + + // Issue some index queries to the second node which bootstrapped the metadata + session, err := setups[1].M3DBClient().DefaultSession() + require.NoError(t, err) + + start := now.Add(-rOpts.RetentionPeriod()) + end := now.Add(blockSize) + queryOpts := index.QueryOptions{StartInclusive: start, EndExclusive: end} + + // Match on common tags + termQuery := idx.NewTermQuery(commonTags[0].Name.Bytes(), commonTags[0].Value.Bytes()) + iter, _, err := session.FetchTaggedIDs(namesp.ID(), + index.Query{Query: termQuery}, queryOpts) + require.NoError(t, err) + defer iter.Finalize() + + count := 0 + for iter.Next() { + count++ + } + require.Equal(t, numSeries, count) +} + +type generateTaggedBlockConfig struct { + series int + numPoints int + commonTags []ident.Tag + blockStart time.Time +} + +func generateTaggedBlockConfigs( + cfg generateTaggedBlockConfig, +) []generate.BlockConfig { + results := make([]generate.BlockConfig, 0, cfg.series) + for i := 0; i < cfg.series; i++ { + id := fmt.Sprintf("series_%d", i) + tags := make([]ident.Tag, 0, 1+len(cfg.commonTags)) + tags = append(tags, ident.Tag{ + Name: ident.StringID("series"), + Value: ident.StringID(fmt.Sprintf("%d", i)), + }) + tags = append(tags, cfg.commonTags...) + results = append(results, generate.BlockConfig{ + IDs: []string{id}, + Tags: ident.NewTags(tags...), + NumPoints: cfg.numPoints, + Start: cfg.blockStart, + }) + } + return results +} + +func blockConfigs(cfgs ...[]generate.BlockConfig) []generate.BlockConfig { + var results []generate.BlockConfig + for _, elem := range cfgs { + results = append(results, elem...) + } + return results } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index 1d592dd03e..c3e3a595ef 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -48,6 +48,11 @@ var ( // src/cmd/services/m3dbnode/config package if this is changed. DefaultShardPersistenceConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) defaultPersistenceMaxQueueSize = 0 + // DefaultShardPersistenceFlushConcurrency controls how many shards in parallel to flush + // for historical data being streamed between peers (historical blocks). + // Update BootstrapPeersConfiguration comment in + // src/cmd/services/m3dbnode/config package if this is changed. + DefaultShardPersistenceFlushConcurrency = 1 ) var ( @@ -60,26 +65,28 @@ var ( ) type options struct { - resultOpts result.Options - client client.AdminClient - defaultShardConcurrency int - shardPersistenceConcurrency int - persistenceMaxQueueSize int - persistManager persist.Manager - runtimeOptionsManager m3dbruntime.OptionsManager - contextPool context.Pool - fsOpts fs.Options - indexOpts index.Options - compactor *compaction.Compactor + resultOpts result.Options + client client.AdminClient + defaultShardConcurrency int + shardPersistenceConcurrency int + shardPersistenceFlushConcurrency int + persistenceMaxQueueSize int + persistManager persist.Manager + runtimeOptionsManager m3dbruntime.OptionsManager + contextPool context.Pool + fsOpts fs.Options + indexOpts index.Options + compactor *compaction.Compactor } // NewOptions creates new bootstrap options. func NewOptions() Options { return &options{ - resultOpts: result.NewOptions(), - defaultShardConcurrency: DefaultShardConcurrency, - shardPersistenceConcurrency: DefaultShardPersistenceConcurrency, - persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, + resultOpts: result.NewOptions(), + defaultShardConcurrency: DefaultShardConcurrency, + shardPersistenceConcurrency: DefaultShardPersistenceConcurrency, + shardPersistenceFlushConcurrency: DefaultShardPersistenceFlushConcurrency, + persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, // Use a zero pool, this should be overriden at config time. contextPool: context.NewPool(context.NewOptions(). SetContextPoolOptions(pool.NewObjectPoolOptions().SetSize(0)). @@ -149,6 +156,16 @@ func (o *options) ShardPersistenceConcurrency() int { return o.shardPersistenceConcurrency } +func (o *options) SetShardPersistenceFlushConcurrency(value int) Options { + opts := *o + opts.shardPersistenceFlushConcurrency = value + return &opts +} + +func (o *options) ShardPersistenceFlushConcurrency() int { + return o.shardPersistenceFlushConcurrency +} + func (o *options) SetPersistenceMaxQueueSize(value int) Options { opts := *o opts.persistenceMaxQueueSize = value diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 742edb8ddf..a737a9c6c0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -22,6 +22,7 @@ package peers import ( "fmt" + "io" "sync" "time" @@ -42,6 +43,7 @@ import ( "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -54,11 +56,12 @@ import ( ) type peersSource struct { - opts Options - log *zap.Logger - nowFn clock.NowFn - persistManager *bootstrapper.SharedPersistManager - compactor *bootstrapper.SharedCompactor + opts Options + log *zap.Logger + newPersistManager func() (persist.Manager, error) + nowFn clock.NowFn + persistManager *bootstrapper.SharedPersistManager + compactor *bootstrapper.SharedCompactor } type persistenceFlush struct { @@ -75,8 +78,11 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { iopts := opts.ResultOptions().InstrumentOptions() return &peersSource{ - opts: opts, - log: iopts.Logger().With(zap.String("bootstrapper", "peers")), + opts: opts, + log: iopts.Logger().With(zap.String("bootstrapper", "peers")), + newPersistManager: func() (persist.Manager, error) { + return fs.NewPersistManager(opts.FilesystemOptions()) + }, nowFn: opts.ResultOptions().ClockOptions().NowFn(), persistManager: &bootstrapper.SharedPersistManager{ Mgr: opts.PersistManager(), @@ -236,8 +242,6 @@ func (s *peersSource) readData( } var ( - namespace = nsMetadata.ID() - persistFlush persist.FlushPreparer shouldPersist = false // TODO(bodu): We should migrate to series.CacheLRU only. seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() @@ -247,24 +251,7 @@ func (s *peersSource) readData( if persistConfig.Enabled && (seriesCachePolicy == series.CacheRecentlyRead || seriesCachePolicy == series.CacheLRU) && persistConfig.FileSetType == persist.FileSetFlushType { - persistManager := s.opts.PersistManager() - - // Neither of these should ever happen - if seriesCachePolicy != series.CacheAll && persistManager == nil { - s.log.Fatal("tried to perform a bootstrap with persistence without persist manager") - } - - s.log.Info("peers bootstrapper resolving block retriever", zap.Stringer("namespace", namespace)) - - persist, err := persistManager.StartFlushPersist() - if err != nil { - return nil, err - } - - defer persist.DoneFlush() - shouldPersist = true - persistFlush = persist } result := result.NewDataBootstrapResult() @@ -277,14 +264,14 @@ func (s *peersSource) readData( var ( resultLock sync.Mutex - wg sync.WaitGroup - persistenceWorkerDoneCh = make(chan struct{}) persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() count = shardTimeRanges.Len() concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() + persistWg = &sync.WaitGroup{} + persistClosers []io.Closer ) if shouldPersist { concurrency = s.opts.ShardPersistenceConcurrency() @@ -295,11 +282,22 @@ func (s *peersSource) readData( zap.Int("concurrency", concurrency), zap.Bool("shouldPersist", shouldPersist)) if shouldPersist { - go s.startPersistenceQueueWorkerLoop( - opts, persistenceWorkerDoneCh, persistenceQueue, persistFlush, result, &resultLock) + // Spin up persist workers. + for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { + closer, err := s.startPersistenceQueueWorkerLoop(opts, + persistWg, persistenceQueue, result, &resultLock) + if err != nil { + return nil, err + } + + persistClosers = append(persistClosers, closer) + } } - workers := xsync.NewWorkerPool(concurrency) + var ( + wg sync.WaitGroup + workers = xsync.NewWorkerPool(concurrency) + ) workers.Init() for shard, ranges := range shardTimeRanges.Iter() { shard, ranges := shard, ranges @@ -315,31 +313,71 @@ func (s *peersSource) readData( wg.Wait() close(persistenceQueue) if shouldPersist { - // Wait for the persistenceQueueWorker to finish flushing everything - <-persistenceWorkerDoneCh + // Wait for the persistenceQueue workers to finish flushing everything. + persistWg.Wait() + + // Close any persist closers to finalize files written. + for _, closer := range persistClosers { + if err := closer.Close(); err != nil { + return nil, err + } + } } return result, nil } -// startPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that +func (s *peersSource) startPersistenceQueueWorkerLoop( + opts bootstrap.RunOptions, + persistWg *sync.WaitGroup, + persistenceQueue chan persistenceFlush, + bootstrapResult result.DataBootstrapResult, + lock *sync.Mutex, +) (io.Closer, error) { + persistMgr, err := s.newPersistManager() + if err != nil { + return nil, err + } + + persistFlush, err := persistMgr.StartFlushPersist() + if err != nil { + return nil, err + } + + persistWg.Add(1) + go func() { + defer persistWg.Done() + s.runPersistenceQueueWorkerLoop(opts, persistenceQueue, + persistFlush, bootstrapResult, lock) + }() + + return xclose.CloserFn(persistFlush.DoneFlush), nil +} + +// runPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that // loops through the persistenceQueue and performs a flush for each entry, ensuring that // no more than one flush is ever happening at once. Once the persistenceQueue channel // is closed, and the worker has completed flushing all the remaining entries, it will close the // provided doneCh so that callers can block until everything has been successfully flushed. -func (s *peersSource) startPersistenceQueueWorkerLoop( +func (s *peersSource) runPersistenceQueueWorkerLoop( opts bootstrap.RunOptions, - doneCh chan struct{}, persistenceQueue chan persistenceFlush, persistFlush persist.FlushPreparer, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, ) { + // Track async cleanup tasks. + asyncTasks := &sync.WaitGroup{} + + // Wait for cleanups to all occur before returning from worker. + defer asyncTasks.Wait() + // If performing a bootstrap with persistence enabled then flush one // at a time as shard results are gathered. + fmt.Printf("!! starting persist worker\n") for flush := range persistenceQueue { err := s.flush(opts, persistFlush, flush.nsMetadata, flush.shard, - flush.shardResult, flush.timeRange) + flush.shardResult, flush.timeRange, asyncTasks) if err == nil { continue } @@ -358,7 +396,7 @@ func (s *peersSource) startPersistenceQueueWorkerLoop( bootstrapResult.SetUnfulfilled(unfulfilled) lock.Unlock() } - close(doneCh) + fmt.Printf("!! finishing persist worker\n") } // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and @@ -492,6 +530,7 @@ func (s *peersSource) flush( shard uint32, shardResult result.ShardResult, tr xtime.Range, + asyncTasks *sync.WaitGroup, ) error { persistConfig := opts.PersistConfig() if persistConfig.FileSetType != persist.FileSetFlushType { @@ -525,9 +564,8 @@ func (s *peersSource) flush( var ( ropts = nsMetadata.Options().RetentionOptions() blockSize = ropts.BlockSize() - flushCtx = s.opts.ContextPool().Get() ) - + fmt.Printf("!! flushing shard=%d, tr=%s\n", shard, tr.String()) for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, @@ -559,6 +597,7 @@ func (s *peersSource) flush( // so it is safe to delete on disk data. DeleteIfExists: true, } + fmt.Printf("!! prepare data shard=%d, start=%s\n", shard, start.String()) prepared, err := flush.PrepareData(prepareOpts) if err != nil { return err @@ -567,48 +606,34 @@ func (s *peersSource) flush( var blockErr error for _, entry := range shardResult.AllSeries().Iter() { s := entry.Value() + fmt.Printf("!! trying persist id=%s\n", s.ID.String()) bl, ok := s.Blocks.BlockAt(start) if !ok { + fmt.Printf("!! block at missing start=%s\n", start) continue } - flushCtx.Reset() - stream, err := bl.Stream(flushCtx) + checksum, err := bl.Checksum() if err != nil { - flushCtx.BlockingCloseReset() blockErr = err // Need to call prepared.Close, avoid return + fmt.Printf("!! block err err=%v\n", err) break } - segment, err := stream.Segment() - if err != nil { - flushCtx.BlockingCloseReset() - blockErr = err // Need to call prepared.Close, avoid return - break - } + // Discard and finalize the block. + segment := bl.Discard() - checksum, err := bl.Checksum() - if err != nil { - flushCtx.BlockingCloseReset() - blockErr = err - break - } + // Remove from map. + s.Blocks.RemoveBlockAt(start) + fmt.Printf("!! REALLY trying persist id=%s, start=%s\n", s.ID.String(), start.String()) metadata := persist.NewMetadataFromIDAndTags(s.ID, s.Tags, persist.MetadataOptions{}) err = prepared.Persist(metadata, segment, checksum) - flushCtx.BlockingCloseReset() if err != nil { blockErr = err // Need to call prepared.Close, avoid return break } - - // Now that we've persisted the data to disk, we can finalize the block, - // as there is no need to keep it in memory. We do this here because it - // is better to do this as we loop to make blocks return to the pool earlier - // than all at once the end of this flush cycle. - s.Blocks.RemoveBlockAt(start) - bl.Close() } // Always close before attempting to check if block error occurred, @@ -624,38 +649,45 @@ func (s *peersSource) flush( } } - // Since we've persisted the data to disk, we don't want to keep all the series in the shard - // result. Otherwise if we leave them in, then they will all get loaded into the shard object, - // and then immediately evicted on the next tick which causes unnecessary memory pressure - // during peer bootstrapping. - numSeriesTriedToRemoveWithRemainingBlocks := 0 - for _, entry := range shardResult.AllSeries().Iter() { - series := entry.Value() - numBlocksRemaining := len(series.Blocks.AllBlocks()) - // Should never happen since we removed all the block in the previous loop and fetching - // bootstrap blocks should always be exclusive on the end side. - if numBlocksRemaining > 0 { - numSeriesTriedToRemoveWithRemainingBlocks++ - continue - } + // Perform cleanup async but allow caller to wait on them. + // This allows to progress to next flush faster. + asyncTasks.Add(1) + go func() { + defer asyncTasks.Done() + + // Since we've persisted the data to disk, we don't want to keep all the series in the shard + // result. Otherwise if we leave them in, then they will all get loaded into the shard object, + // and then immediately evicted on the next tick which causes unnecessary memory pressure + // during peer bootstrapping. + numSeriesTriedToRemoveWithRemainingBlocks := 0 + for _, entry := range shardResult.AllSeries().Iter() { + series := entry.Value() + numBlocksRemaining := len(series.Blocks.AllBlocks()) + // Should never happen since we removed all the block in the previous loop and fetching + // bootstrap blocks should always be exclusive on the end side. + if numBlocksRemaining > 0 { + numSeriesTriedToRemoveWithRemainingBlocks++ + continue + } - shardResult.RemoveSeries(series.ID) - series.Blocks.Close() - // Safe to finalize these IDs and Tags because the prepared object was the only other thing - // using them, and it has been closed. - series.ID.Finalize() - series.Tags.Finalize() - } - if numSeriesTriedToRemoveWithRemainingBlocks > 0 { - iOpts := s.opts.ResultOptions().InstrumentOptions() - instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { - l.With( - zap.Int64("start", tr.Start.Unix()), - zap.Int64("end", tr.End.Unix()), - zap.Int("numTimes", numSeriesTriedToRemoveWithRemainingBlocks), - ).Error("error tried to remove series that still has blocks") - }) - } + shardResult.RemoveSeries(series.ID) + series.Blocks.Close() + // Safe to finalize these IDs and Tags because the prepared object was the only other thing + // using them, and it has been closed. + series.ID.Finalize() + series.Tags.Finalize() + } + if numSeriesTriedToRemoveWithRemainingBlocks > 0 { + iOpts := s.opts.ResultOptions().InstrumentOptions() + instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { + l.With( + zap.Int64("start", tr.Start.Unix()), + zap.Int64("end", tr.End.Unix()), + zap.Int("numTimes", numSeriesTriedToRemoveWithRemainingBlocks), + ).Error("error tried to remove series that still has blocks") + }) + } + }() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 564fc4054c..b97e0533ff 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -39,7 +39,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" @@ -56,15 +55,23 @@ var ( testNamespace = ident.StringID("testnamespace") testNamespaceMetadata = func(t *testing.T, opts ...namespaceOption) namespace.Metadata { namespaceOpts := namespace.NewOptions() + idxOpts := namespaceOpts.IndexOptions() + namespaceOpts = namespaceOpts.SetIndexOptions(idxOpts.SetEnabled(true)) for _, opt := range opts { namespaceOpts = opt(namespaceOpts) } - idxOpts := namespaceOpts.IndexOptions() - namespaceOpts = namespaceOpts.SetIndexOptions(idxOpts.SetEnabled(true)) ns, err := namespace.NewMetadata(testNamespace, namespaceOpts) require.NoError(t, err) return ns } + testNamespaceMetadataNoIndex = func(t *testing.T, opts ...namespaceOption) namespace.Metadata { + newOpts := append([]namespaceOption(nil), opts...) + newOpts = append(newOpts, func(namespaceOpts namespace.Options) namespace.Options { + idxOpts := namespaceOpts.IndexOptions() + return namespaceOpts.SetIndexOptions(idxOpts.SetEnabled(false)) + }) + return testNamespaceMetadata(t, newOpts...) + } testDefaultRunOpts = bootstrap.NewRunOptions(). SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) @@ -74,6 +81,8 @@ var ( testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) ) +type namespaceOption func(namespace.Options) namespace.Options + func newTestDefaultOpts(t *testing.T, ctrl *gomock.Controller) Options { idxOpts := index.NewOptions() compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(), @@ -129,10 +138,8 @@ func newValidMockRuntimeOptionsManager(t *testing.T, ctrl *gomock.Controller) m3 return mockRuntimeOptsMgr } -type namespaceOption func(namespace.Options) namespace.Options - func TestPeersSourceEmptyShardTimeRanges(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() opts := newTestDefaultOpts(t, ctrl). @@ -142,7 +149,7 @@ func TestPeersSourceEmptyShardTimeRanges(t *testing.T) { require.NoError(t, err) var ( - nsMetadata = testNamespaceMetadata(t) + nsMetadata = testNamespaceMetadataNoIndex(t) target = result.NewShardTimeRanges() runOpts = testDefaultRunOpts.SetInitialTopologyState(&topology.StateSnapshot{}) ) @@ -163,10 +170,10 @@ func TestPeersSourceEmptyShardTimeRanges(t *testing.T) { } func TestPeersSourceReturnsErrorForAdminSession(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() - nsMetadata := testNamespaceMetadata(t) + nsMetadata := testNamespaceMetadataNoIndex(t) ropts := nsMetadata.Options().RetentionOptions() expectedErr := errors.New("an error") @@ -203,17 +210,17 @@ func TestPeersSourceReturnsErrorForAdminSession(t *testing.T) { } func TestPeersSourceReturnsUnfulfilled(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() opts := newTestDefaultOpts(t, ctrl) - nsMetadata := testNamespaceMetadata(t) + nsMetadata := testNamespaceMetadataNoIndex(t) ropts := nsMetadata.Options().RetentionOptions() start := time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) end := start.Add(ropts.BlockSize()) - goodResult := result.NewShardResult(0, opts.ResultOptions()) + goodResult := result.NewShardResult(opts.ResultOptions()) fooBlock := block.NewDatabaseBlock(start, ropts.BlockSize(), ts.Segment{}, testBlockOpts, namespace.Context{}) goodID := ident.StringID("foo") goodResult.AddBlock(goodID, ident.NewTags(ident.StringTag("foo", "oof")), fooBlock) @@ -266,10 +273,10 @@ func TestPeersSourceRunWithPersist(t *testing.T) { series.CacheRecentlyRead, series.CacheLRU, } { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() - testNsMd := testNamespaceMetadata(t) + testNsMd := testNamespaceMetadataNoIndex(t) resultOpts := testDefaultResultOpts.SetSeriesCachePolicy(cachePolicy) opts := newTestDefaultOpts(t, ctrl).SetResultOptions(resultOpts) ropts := testNsMd.Options().RetentionOptions() @@ -279,8 +286,8 @@ func TestPeersSourceRunWithPersist(t *testing.T) { start := time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) end := start.Add(2 * ropts.BlockSize()) - shard0ResultBlock1 := result.NewShardResult(0, opts.ResultOptions()) - shard0ResultBlock2 := result.NewShardResult(0, opts.ResultOptions()) + shard0ResultBlock1 := result.NewShardResult(opts.ResultOptions()) + shard0ResultBlock2 := result.NewShardResult(opts.ResultOptions()) fooBlock := block.NewDatabaseBlock(start, ropts.BlockSize(), ts.NewSegment(checked.NewBytes([]byte{1, 2, 3}, nil), nil, 1, ts.FinalizeNone), testBlockOpts, namespace.Context{}) @@ -290,8 +297,8 @@ func TestPeersSourceRunWithPersist(t *testing.T) { shard0ResultBlock1.AddBlock(ident.StringID("foo"), ident.NewTags(ident.StringTag("foo", "oof")), fooBlock) shard0ResultBlock2.AddBlock(ident.StringID("bar"), ident.NewTags(ident.StringTag("bar", "rab")), barBlock) - shard1ResultBlock1 := result.NewShardResult(0, opts.ResultOptions()) - shard1ResultBlock2 := result.NewShardResult(0, opts.ResultOptions()) + shard1ResultBlock1 := result.NewShardResult(opts.ResultOptions()) + shard1ResultBlock2 := result.NewShardResult(opts.ResultOptions()) bazBlock := block.NewDatabaseBlock(start, ropts.BlockSize(), ts.NewSegment(checked.NewBytes([]byte{7, 8, 9}, nil), nil, 3, ts.FinalizeNone), testBlockOpts, namespace.Context{}) @@ -417,11 +424,13 @@ func TestPeersSourceRunWithPersist(t *testing.T) { mockPersistManager := persist.NewMockManager(ctrl) mockPersistManager.EXPECT().StartFlushPersist().Return(flushPreparer, nil) - opts = opts.SetPersistManager(mockPersistManager) - src, err := newPeersSource(opts) require.NoError(t, err) + src.(*peersSource).newPersistManager = func() (persist.Manager, error) { + return mockPersistManager, nil + } + target := result.NewShardTimeRanges().Set( 0, xtime.NewRanges(xtime.Range{Start: start, End: end}), @@ -448,7 +457,7 @@ func TestPeersSourceRunWithPersist(t *testing.T) { } func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() opts := newTestDefaultOpts(t, ctrl). @@ -456,7 +465,7 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { ResultOptions(). SetSeriesCachePolicy(series.CacheRecentlyRead), ) - testNsMd := testNamespaceMetadata(t) + testNsMd := testNamespaceMetadataNoIndex(t) ropts := testNsMd.Options().RetentionOptions() start := time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) @@ -472,7 +481,7 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { results := make(map[resultsKey]result.ShardResult) addResult := func(shard uint32, id string, b block.DatabaseBlock, expectedErr bool) { - r := result.NewShardResult(0, opts.ResultOptions()) + r := result.NewShardResult(opts.ResultOptions()) r.AddBlock(ident.StringID(id), ident.NewTags(ident.StringTag(id, id)), b) start := b.StartTime() end := start.Add(ropts.BlockSize()) @@ -485,8 +494,7 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { var fooBlocks [2]block.DatabaseBlock fooBlocks[0] = block.NewMockDatabaseBlock(ctrl) fooBlocks[0].(*block.MockDatabaseBlock).EXPECT().StartTime().Return(start).AnyTimes() - fooBlocks[0].(*block.MockDatabaseBlock).EXPECT(). - Stream(gomock.Any()).Return(xio.EmptyBlockReader, errors.New("stream err")) + fooBlocks[0].(*block.MockDatabaseBlock).EXPECT().Checksum().Return(uint32(0), errors.New("stream err")) addResult(0, "foo", fooBlocks[0], true) fooBlocks[1] = block.NewDatabaseBlock(midway, ropts.BlockSize(), @@ -495,17 +503,10 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { addResult(0, "foo", fooBlocks[1], false) // bar results - mockStream := xio.NewMockSegmentReader(ctrl) - mockStream.EXPECT().Segment().Return(ts.Segment{}, segmentError) - - b := xio.BlockReader{ - SegmentReader: mockStream, - } - var barBlocks [2]block.DatabaseBlock barBlocks[0] = block.NewMockDatabaseBlock(ctrl) barBlocks[0].(*block.MockDatabaseBlock).EXPECT().StartTime().Return(start).AnyTimes() - barBlocks[0].(*block.MockDatabaseBlock).EXPECT().Stream(gomock.Any()).Return(b, nil) + barBlocks[0].(*block.MockDatabaseBlock).EXPECT().Checksum().Return(uint32(0), errors.New("stream err")) addResult(1, "bar", barBlocks[0], false) barBlocks[1] = block.NewDatabaseBlock(midway, ropts.BlockSize(), @@ -621,6 +622,7 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { PrepareData(prepareOpts). Return(persist.PreparedDataPersist{ Persist: func(metadata persist.Metadata, segment ts.Segment, checksum uint32) error { + panic("wat") assert.Fail(t, "not expecting to flush shard 0 at start + block size") return nil }, @@ -727,11 +729,13 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { mockPersistManager := persist.NewMockManager(ctrl) mockPersistManager.EXPECT().StartFlushPersist().Return(flushPreprarer, nil) - opts = opts.SetPersistManager(mockPersistManager) - src, err := newPeersSource(opts) require.NoError(t, err) + src.(*peersSource).newPersistManager = func() (persist.Manager, error) { + return mockPersistManager, nil + } + target := result.NewShardTimeRanges().Set( 0, xtime.NewRanges( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go index 58eb3dec8b..420b32655b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go @@ -257,7 +257,7 @@ func TestBootstrapIndex(t *testing.T) { _ result.Options, ) (result.ShardResult, error) { goodID := ident.StringID("foo") - goodResult := result.NewShardResult(0, opts.ResultOptions()) + goodResult := result.NewShardResult(opts.ResultOptions()) for ; blockStart.Before(blockEnd); blockStart = blockStart.Add(blockSize) { fooBlock := block.NewDatabaseBlock(blockStart, ropts.BlockSize(), ts.Segment{}, testBlockOpts, namespace.Context{}) @@ -388,138 +388,6 @@ func TestBootstrapIndex(t *testing.T) { tester.EnsureNoWrites() } -// TODO(bodu): Add some error case testing. -//func TestBootstrapIndexErr(t *testing.T) { -// ctrl := gomock.NewController(t) -// defer ctrl.Finish() -// -// opts := newTestDefaultOpts(t, ctrl) -// pm, err := fs.NewPersistManager(opts.FilesystemOptions()) -// require.NoError(t, err) -// opts = opts.SetPersistManager(pm) -// -// blockSize := 2 * time.Hour -// indexBlockSize := 2 * blockSize -// -// ropts := retention.NewOptions(). -// SetBlockSize(blockSize). -// SetRetentionPeriod(24 * blockSize) -// -// nsMetadata := testNamespaceMetadata(t, func(opts namespace.Options) namespace.Options { -// return opts. -// SetRetentionOptions(ropts). -// SetIndexOptions(opts.IndexOptions(). -// SetEnabled(true). -// SetBlockSize(indexBlockSize)) -// }) -// -// at := time.Now() -// start := at.Add(-ropts.RetentionPeriod()).Truncate(blockSize) -// indexStart := start.Truncate(indexBlockSize) -// for !start.Equal(indexStart) { -// // make sure data blocks overlap, test block size is 2h -// // and test index block size is 4h -// start = start.Add(blockSize) -// indexStart = start.Truncate(indexBlockSize) -// } -// -// fooSeries := struct { -// id string -// tags map[string]string -// }{ -// "foo", -// map[string]string{"aaa": "bbb", "ccc": "ddd"}, -// } -// dataBlocks := []struct { -// blockStart time.Time -// series []testSeriesMetadata -// }{ -// { -// blockStart: start, -// series: []testSeriesMetadata{ -// {fooSeries.id, fooSeries.tags, []byte{0x1}}, -// }, -// }, -// { -// blockStart: start.Add(blockSize), -// series: []testSeriesMetadata{ -// {fooSeries.id, fooSeries.tags, []byte{0x2}}, -// }, -// }, -// } -// -// end := start.Add(ropts.RetentionPeriod()) -// -// shardTimeRanges := map[uint32]xtime.Ranges{ -// 0: xtime.NewRanges(xtime.Range{ -// Start: start, -// End: end, -// }), -// } -// -// mockAdminSession := client.NewMockAdminSession(ctrl) -// mockAdminSession.EXPECT(). -// FetchBootstrapBlocksFromPeers(gomock.Any(), -// gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( -// func( -// _ namespace.Metadata, -// _ uint32, -// blockStart time.Time, -// blockEnd time.Time, -// _ result.Options, -// ) (result.ShardResult, error) { -// goodID := ident.StringID("foo") -// goodResult := result.NewShardResult(0, opts.ResultOptions()) -// for ; blockStart.Before(blockEnd); blockStart = blockStart.Add(blockSize) { -// fooBlock := block.NewDatabaseBlock(blockStart, ropts.BlockSize(), -// ts.Segment{}, testBlockOpts, namespace.Context{}) -// goodResult.AddBlock(goodID, ident.NewTags(ident.StringTag("foo", "oof")), fooBlock) -// } -// return goodResult, nil -// }).AnyTimes() -// -// mockAdminClient := client.NewMockAdminClient(ctrl) -// mockAdminClient.EXPECT().DefaultAdminSession().Return(mockAdminSession, nil).AnyTimes() -// opts = opts.SetAdminClient(mockAdminClient) -// -// src, err := newPeersSource(opts) -// require.NoError(t, err) -// -// tester := bootstrap.BuildNamespacesTester(t, testDefaultRunOpts, shardTimeRanges, nsMetadata) -// defer tester.Finish() -// tester.TestReadWith(src) -// -// tester.TestUnfulfilledForNamespaceIsEmpty(nsMetadata) -// results := tester.ResultForNamespace(nsMetadata.ID()) -// indexResults := results.IndexResult.IndexResults() -// numBlocksWithData := 0 -// for _, b := range indexResults { -// if len(b.Segments()) != 0 { -// numBlocksWithData++ -// } -// } -// require.Equal(t, 1, numBlocksWithData) -// -// t1 := indexStart -// -// blk1, ok := indexResults[xtime.ToUnixNano(t1)] -// require.True(t, ok) -// require.True(t, blk1.Fulfilled().IsEmpty()) -// -// for _, blk := range indexResults { -// if blk.BlockStart().Equal(t1) { -// continue // already checked above -// } -// // rest should all be marked fulfilled despite no data, because we didn't see -// // any errors in the response. -// start := blk.BlockStart() -// end := start.Add(indexBlockSize) -// assertShardRangesEqual(t, result.NewShardTimeRanges(start, end, 0), blk.Fulfilled()) -// } -// -// tester.EnsureNoWrites() -//} - func assertShardRangesEqual(t *testing.T, a, b result.ShardTimeRanges) { ac := a.Copy() ac.Subtract(b) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 3e7193f607..33ebd5dcfe 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -68,6 +68,16 @@ type Options interface { // persistence enabled. ShardPersistenceConcurrency() int + // SetShardPersistenceFlushConcurrency sets the flush concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + SetShardPersistenceFlushConcurrency(value int) Options + + // ShardPersistenceFlushConcurrency returns the flush concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + ShardPersistenceFlushConcurrency() int + // SetPersistenceMaxQueueSize sets the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. diff --git a/src/dbnode/storage/bootstrap/result/result_data.go b/src/dbnode/storage/bootstrap/result/result_data.go index 6189905bf8..51fbdcfbad 100644 --- a/src/dbnode/storage/bootstrap/result/result_data.go +++ b/src/dbnode/storage/bootstrap/result/result_data.go @@ -66,11 +66,10 @@ type shardResult struct { } // NewShardResult creates a new shard result. -func NewShardResult(capacity int, opts Options) ShardResult { +func NewShardResult(opts Options) ShardResult { return &shardResult{ opts: opts, blocks: NewMap(MapOptions{ - InitialSize: capacity, KeyCopyPool: opts.DatabaseBlockOptions().BytesPool().BytesPool(), }), } diff --git a/src/dbnode/storage/bootstrap/result/result_data_test.go b/src/dbnode/storage/bootstrap/result/result_data_test.go index e120783181..f7208f5457 100644 --- a/src/dbnode/storage/bootstrap/result/result_data_test.go +++ b/src/dbnode/storage/bootstrap/result/result_data_test.go @@ -160,7 +160,7 @@ func TestResultSetUnfulfilled(t *testing.T) { func TestShardResultIsEmpty(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) require.True(t, sr.IsEmpty()) block := opts.DatabaseBlockOptions().DatabaseBlockPool().Get() block.Reset(time.Now(), time.Hour, ts.Segment{}, namespace.Context{}) @@ -171,7 +171,7 @@ func TestShardResultIsEmpty(t *testing.T) { func TestShardResultAddBlock(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) start := time.Now() inputs := []struct { id string @@ -199,7 +199,7 @@ func TestShardResultAddBlock(t *testing.T) { func TestShardResultAddSeries(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) start := time.Now() inputs := []struct { id string @@ -229,10 +229,10 @@ func TestShardResultAddSeries(t *testing.T) { func TestShardResultAddResult(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) sr.AddResult(nil) require.True(t, sr.IsEmpty()) - other := NewShardResult(0, opts) + other := NewShardResult(opts) other.AddSeries(ident.StringID("foo"), ident.NewTags(ident.StringTag("foo", "foe")), block.NewDatabaseSeriesBlocks(0)) other.AddSeries(ident.StringID("bar"), ident.NewTags(ident.StringTag("bar", "baz")), block.NewDatabaseSeriesBlocks(0)) sr.AddResult(other) @@ -241,10 +241,10 @@ func TestShardResultAddResult(t *testing.T) { func TestShardResultNumSeries(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) sr.AddResult(nil) require.True(t, sr.IsEmpty()) - other := NewShardResult(0, opts) + other := NewShardResult(opts) other.AddSeries(ident.StringID("foo"), ident.NewTags(ident.StringTag("foo", "foe")), block.NewDatabaseSeriesBlocks(0)) other.AddSeries(ident.StringID("bar"), ident.NewTags(ident.StringTag("bar", "baz")), block.NewDatabaseSeriesBlocks(0)) sr.AddResult(other) @@ -253,7 +253,7 @@ func TestShardResultNumSeries(t *testing.T) { func TestShardResultRemoveSeries(t *testing.T) { opts := testResultOptions() - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) inputs := []struct { id string tags ident.Tags @@ -423,7 +423,7 @@ func TestEstimateMapBytesSize(t *testing.T) { block.NewDatabaseBlock(start.Add(1*testBlockSize), testBlockSize, ts.Segment{Tail: threeBytes}, blopts, namespace.Context{}), } - sr := NewShardResult(0, opts) + sr := NewShardResult(opts) fooTags := ident.NewTags(ident.StringTag("foo", "foe")) barTags := ident.NewTags(ident.StringTag("bar", "baz")) diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index 202dbbe2f6..8f6eb523b2 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -273,8 +273,7 @@ func (r shardRepairer) Repair( // TODO(rartoul): Copying the IDs for the purposes of the map key is wasteful. Considering using // SetUnsafe or marking as NoFinalize() and making the map check IsNoFinalize(). - numMismatchSeries := seriesWithChecksumMismatches.Len() - results := result.NewShardResult(numMismatchSeries, rsOpts) + results := result.NewShardResult(rsOpts) for i, metadatasToFetchBlocksFor := range metadatasToFetchBlocksForPerSession { if len(metadatasToFetchBlocksFor) == 0 { continue diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 00c9507ec3..3b7454f565 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -378,7 +378,7 @@ func testShardLoadLimit(t *testing.T, limit int64, shouldReturnError bool) { start = time.Now().Truncate(testBlockSize) threeBytes = checked.NewBytes([]byte("123"), nil) - sr = result.NewShardResult(0, result.NewOptions()) + sr = result.NewShardResult(result.NewOptions()) fooTags = ident.NewTags(ident.StringTag("foo", "foe")) barTags = ident.NewTags(ident.StringTag("bar", "baz")) )