Skip to content

Commit

Permalink
[dbnode] Add ability to configure concurrent building of index segmen…
Browse files Browse the repository at this point in the history
…ts at bootstrap (#2620)
  • Loading branch information
robskillington committed Oct 23, 2020
1 parent 2e365ae commit 5e84431
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 165 deletions.
26 changes: 15 additions & 11 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ package config

import (
"fmt"
"math"
"runtime"

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand Down Expand Up @@ -65,22 +63,24 @@ type BootstrapConfiguration struct {
// CacheSeriesMetadata determines whether individual bootstrappers cache
// series metadata across all calls (namespaces / shards / blocks).
CacheSeriesMetadata *bool `yaml:"cacheSeriesMetadata"`

// IndexSegmentConcurrency determines the concurrency for building index
// segments.
IndexSegmentConcurrency *int `yaml:"indexSegmentConcurrency"`
}

// BootstrapFilesystemConfiguration specifies config for the fs bootstrapper.
type BootstrapFilesystemConfiguration struct {
// NumProcessorsPerCPU is the number of processors per CPU.
NumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"`
// DeprecatedNumProcessorsPerCPU is the number of processors per CPU.
// TODO: Remove, this is deprecated since BootstrapDataNumProcessors() is
// no longer actually used anywhere.
DeprecatedNumProcessorsPerCPU float64 `yaml:"numProcessorsPerCPU" validate:"min=0.0"`

// Migration configuration specifies what version, if any, existing data filesets should be migrated to
// if necessary.
Migration *BootstrapMigrationConfiguration `yaml:"migration"`
}

func (c BootstrapFilesystemConfiguration) numCPUs() int {
return int(math.Ceil(float64(c.NumProcessorsPerCPU * float64(runtime.NumCPU()))))
}

func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfiguration {
if cfg := c.Migration; cfg != nil {
return *cfg
Expand All @@ -90,8 +90,7 @@ func (c BootstrapFilesystemConfiguration) migration() BootstrapMigrationConfigur

func newDefaultBootstrapFilesystemConfiguration() BootstrapFilesystemConfiguration {
return BootstrapFilesystemConfiguration{
NumProcessorsPerCPU: defaultNumProcessorsPerCPU,
Migration: &BootstrapMigrationConfiguration{},
Migration: &BootstrapMigrationConfiguration{},
}
}

Expand Down Expand Up @@ -219,11 +218,13 @@ func (bsc BootstrapConfiguration) New(
SetIndexOptions(opts.IndexOptions()).
SetPersistManager(opts.PersistManager()).
SetCompactor(compactor).
SetBoostrapDataNumProcessors(fsCfg.numCPUs()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions()).
SetStorageOptions(opts)
if v := bsc.IndexSegmentConcurrency; v != nil {
fsbOpts = fsbOpts.SetIndexSegmentConcurrency(*v)
}
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
Expand Down Expand Up @@ -263,6 +264,9 @@ func (bsc BootstrapConfiguration) New(
SetDefaultShardConcurrency(pCfg.StreamShardConcurrency).
SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency).
SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency)
if v := bsc.IndexSegmentConcurrency; v != nil {
pOpts = pOpts.SetIndexSegmentConcurrency(*v)
}
if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func TestConfiguration(t *testing.T) {
returnUnfulfilledForCorruptCommitLogFiles: false
peers: null
cacheSeriesMetadata: null
indexSegmentConcurrency: null
blockRetrieve: null
cache:
series: null
Expand Down
26 changes: 15 additions & 11 deletions src/dbnode/integration/peers_bootstrap_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package integration

import (
"encoding/json"
"fmt"
"testing"
"time"
Expand All @@ -40,17 +39,22 @@ import (
"go.uber.org/zap"
)

func TestPeersBootstrapHighConcurrency(t *testing.T) {
for _, test := range []testPeersBootstrapHighConcurrencyOptions{
{BatchSize: 16, Concurrency: 64, BatchesPerWorker: 8},
{BatchSize: 64, Concurrency: 16, BatchesPerWorker: 8},
} {
name, err := json.Marshal(test)
require.NoError(t, err)
t.Run(string(name), func(t *testing.T) {
testPeersBootstrapHighConcurrency(t, test)
func TestPeersBootstrapHighConcurrencyBatch16Workers64(t *testing.T) {
testPeersBootstrapHighConcurrency(t,
testPeersBootstrapHighConcurrencyOptions{
BatchSize: 16,
Concurrency: 64,
BatchesPerWorker: 8,
})
}

func TestPeersBootstrapHighConcurrencyBatch64Workers16(t *testing.T) {
testPeersBootstrapHighConcurrency(t,
testPeersBootstrapHighConcurrencyOptions{
BatchSize: 64,
Concurrency: 16,
BatchesPerWorker: 8,
})
}
}

type testPeersBootstrapHighConcurrencyOptions struct {
Expand Down
63 changes: 29 additions & 34 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package fs

import (
"errors"
"fmt"
"math"
goruntime "runtime"

Expand Down Expand Up @@ -56,21 +57,23 @@ var (
// us splitting an index block into smaller pieces is moot because we'll
// pull a lot more data into memory if we create more than one at a time.
defaultBootstrapIndexNumProcessors = 1

// defaultIndexSegmentConcurrency defines the default index segment building concurrency.
defaultIndexSegmentConcurrency = 1
)

type options struct {
instrumentOpts instrument.Options
resultOpts result.Options
fsOpts fs.Options
indexOpts index.Options
persistManager persist.Manager
compactor *compaction.Compactor
bootstrapDataNumProcessors int
bootstrapIndexNumProcessors int
runtimeOptsMgr runtime.OptionsManager
identifierPool ident.Pool
migrationOpts migration.Options
storageOpts storage.Options
instrumentOpts instrument.Options
resultOpts result.Options
fsOpts fs.Options
indexOpts index.Options
persistManager persist.Manager
compactor *compaction.Compactor
indexSegmentConcurrency int
runtimeOptsMgr runtime.OptionsManager
identifierPool ident.Pool
migrationOpts migration.Options
storageOpts storage.Options
}

// NewOptions creates new bootstrap options
Expand All @@ -82,14 +85,13 @@ func NewOptions() Options {
idPool := ident.NewPool(bytesPool, ident.PoolOptions{})

return &options{
instrumentOpts: instrument.NewOptions(),
resultOpts: result.NewOptions(),
bootstrapDataNumProcessors: defaultBootstrapDataNumProcessors,
bootstrapIndexNumProcessors: defaultBootstrapIndexNumProcessors,
runtimeOptsMgr: runtime.NewOptionsManager(),
identifierPool: idPool,
migrationOpts: migration.NewOptions(),
storageOpts: storage.NewOptions(),
instrumentOpts: instrument.NewOptions(),
resultOpts: result.NewOptions(),
indexSegmentConcurrency: defaultIndexSegmentConcurrency,
runtimeOptsMgr: runtime.NewOptionsManager(),
identifierPool: idPool,
migrationOpts: migration.NewOptions(),
storageOpts: storage.NewOptions(),
}
}

Expand All @@ -112,6 +114,9 @@ func (o *options) Validate() error {
if err := o.migrationOpts.Validate(); err != nil {
return err
}
if n := o.indexSegmentConcurrency; n <= 0 {
return fmt.Errorf("index segment concurrency not >= 1: actual=%d", n)
}
return nil
}

Expand Down Expand Up @@ -175,24 +180,14 @@ func (o *options) Compactor() *compaction.Compactor {
return o.compactor
}

func (o *options) SetBoostrapDataNumProcessors(value int) Options {
opts := *o
opts.bootstrapDataNumProcessors = value
return &opts
}

func (o *options) BoostrapDataNumProcessors() int {
return o.bootstrapDataNumProcessors
}

func (o *options) SetBoostrapIndexNumProcessors(value int) Options {
func (o *options) SetIndexSegmentConcurrency(value int) Options {
opts := *o
opts.bootstrapIndexNumProcessors = value
opts.indexSegmentConcurrency = value
return &opts
}

func (o *options) BoostrapIndexNumProcessors() int {
return o.bootstrapIndexNumProcessors
func (o *options) IndexSegmentConcurrency() int {
return o.indexSegmentConcurrency
}

func (o *options) SetRuntimeOptionsManager(value runtime.OptionsManager) Options {
Expand Down
Loading

0 comments on commit 5e84431

Please sign in to comment.