Skip to content

Commit

Permalink
[dbnode] Add configuration to flush data in parallel for peer streami…
Browse files Browse the repository at this point in the history
…ng (#2594)
  • Loading branch information
robskillington committed Oct 23, 2020
1 parent 907fc82 commit 2e365ae
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 324 deletions.
12 changes: 9 additions & 3 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ type BootstrapPeersConfiguration struct {
// for historical data being streamed between peers (historical blocks).
// Defaults to: numCPU / 2.
StreamPersistShardConcurrency int `yaml:"streamPersistShardConcurrency"`
// StreamPersistShardFlushConcurrency controls how many shards in parallel to flush
// for historical data being streamed between peers (historical blocks).
// Defaults to: 1.
StreamPersistShardFlushConcurrency int `yaml:"streamPersistShardFlushConcurrency"`
}

func newDefaultBootstrapPeersConfiguration() BootstrapPeersConfiguration {
return BootstrapPeersConfiguration{
StreamShardConcurrency: peers.DefaultShardConcurrency,
StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency,
StreamShardConcurrency: peers.DefaultShardConcurrency,
StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency,
StreamPersistShardFlushConcurrency: peers.DefaultShardPersistenceFlushConcurrency,
}
}

Expand Down Expand Up @@ -256,7 +261,8 @@ func (bsc BootstrapConfiguration) New(
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetContextPool(opts.ContextPool()).
SetDefaultShardConcurrency(pCfg.StreamShardConcurrency).
SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency)
SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency).
SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency)
if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil {
return nil, err
}
Expand Down
23 changes: 13 additions & 10 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,11 @@ import (
)

const (
clusterConnectWaitInterval = 10 * time.Millisecond
blocksMetadataChannelInitialCapacity = 4096
gaugeReportInterval = 500 * time.Millisecond
blockMetadataChBufSize = 4096
shardResultCapacity = 4096
hostNotAvailableMinSleepInterval = 1 * time.Millisecond
hostNotAvailableMaxSleepInterval = 100 * time.Millisecond
clusterConnectWaitInterval = 10 * time.Millisecond
gaugeReportInterval = 500 * time.Millisecond
blockMetadataChBufSize = 65536
hostNotAvailableMinSleepInterval = 1 * time.Millisecond
hostNotAvailableMaxSleepInterval = 100 * time.Millisecond
)

type resultTypeEnum string
Expand Down Expand Up @@ -2036,7 +2034,7 @@ func (s *session) fetchBlocksMetadataFromPeers(

var (
metadataCh = make(chan receivedBlockMetadata,
blocksMetadataChannelInitialCapacity)
blockMetadataChBufSize)
errCh = make(chan error, 1)
meta = resultTypeMetadata
m = s.newPeerMetadataStreamingProgressMetrics(shard, meta)
Expand Down Expand Up @@ -3550,7 +3548,7 @@ func newBulkBlocksResult(
return &bulkBlocksResult{
nsCtx: nsCtx,
baseBlocksResult: newBaseBlocksResult(nsCtx, opts, resultOpts),
result: result.NewShardResult(shardResultCapacity, resultOpts),
result: result.NewShardResult(resultOpts),
tagDecoderPool: tagDecoderPool,
idPool: idPool,
}
Expand Down Expand Up @@ -3654,7 +3652,12 @@ type enqueueCh struct {
metrics *streamFromPeersMetrics
}

const enqueueChannelDefaultLen = 32768
// enqueueChannelDefaultLen is the queue length for processing series ready to
// be fetched from other peers.
// It was reduced from 32k to 512 since each struct in the queue is quite large
// and with 32k capacity was using significant memory with high shard
// concurrency.
const enqueueChannelDefaultLen = 512

func newEnqueueChannel(m *streamFromPeersMetrics) enqueueChannel {
c := &enqueueCh{
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func newDefaultBootstrappableTestSetups(
SetTopologyInitializer(topologyInitializer).(client.AdminOptions).
SetOrigin(origin)

// Prevent integration tests from timing out when a node is down
// Prevent integration tests from timing out when a node is down
retryOpts = xretry.NewOptions().
SetInitialBackoff(1 * time.Millisecond).
SetMaxRetries(1).
Expand Down
158 changes: 139 additions & 19 deletions src/dbnode/integration/peers_bootstrap_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,72 @@
package integration

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestPeersBootstrapHighConcurrency(t *testing.T) {
for _, test := range []testPeersBootstrapHighConcurrencyOptions{
{BatchSize: 16, Concurrency: 64, BatchesPerWorker: 8},
{BatchSize: 64, Concurrency: 16, BatchesPerWorker: 8},
} {
name, err := json.Marshal(test)
require.NoError(t, err)
t.Run(string(name), func(t *testing.T) {
testPeersBootstrapHighConcurrency(t, test)
})
}
}

type testPeersBootstrapHighConcurrencyOptions struct {
BatchSize int
Concurrency int
BatchesPerWorker int
}

func testPeersBootstrapHighConcurrency(
t *testing.T,
testOpts testPeersBootstrapHighConcurrencyOptions,
) {
if testing.Short() {
t.SkipNow()
}

// Test setups
log := xtest.NewLogger(t)
retentionOpts := retention.NewOptions().

blockSize := 2 * time.Hour

idxOpts := namespace.NewIndexOptions().
SetEnabled(true).
SetBlockSize(blockSize)

rOpts := retention.NewOptions().
SetRetentionPeriod(6 * time.Hour).
SetBlockSize(2 * time.Hour).
SetBlockSize(blockSize).
SetBufferPast(10 * time.Minute).
SetBufferFuture(2 * time.Minute)
namesp, err := namespace.NewMetadata(testNamespaces[0],
namespace.NewOptions().SetRetentionOptions(retentionOpts))

nOpts := namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts)

namesp, err := namespace.NewMetadata(testNamespaces[0], nOpts)
require.NoError(t, err)

opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{namesp}).
// Use TChannel clients for writing / reading because we want to target individual nodes at a time
Expand All @@ -73,31 +112,54 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) {
defer closeFn()

// Write test data for first node
total := 8 * batchSize * concurrency
log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency", total, batchSize, concurrency)
shardIDs := make([]string, 0, total)
for i := 0; i < total; i++ {
id := fmt.Sprintf("id.%d", i)
shardIDs = append(shardIDs, id)
}
numSeries := testOpts.BatchesPerWorker * testOpts.Concurrency * testOpts.BatchSize
log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency",
numSeries, testOpts.BatchSize, testOpts.Concurrency)

now := setups[0].NowFn()()
blockSize := retentionOpts.BlockSize()
seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{IDs: shardIDs, NumPoints: 3, Start: now.Add(-3 * blockSize)},
{IDs: shardIDs, NumPoints: 3, Start: now.Add(-2 * blockSize)},
{IDs: shardIDs, NumPoints: 3, Start: now.Add(-blockSize)},
{IDs: shardIDs, NumPoints: 3, Start: now},
})
commonTags := []ident.Tag{
{
Name: ident.StringID("fruit"),
Value: ident.StringID("apple"),
},
}
numPoints := 10
seriesMaps := generate.BlocksByStart(blockConfigs(
generateTaggedBlockConfigs(generateTaggedBlockConfig{
series: numSeries,
numPoints: numPoints,
commonTags: commonTags,
blockStart: now.Add(-3 * blockSize),
}),
generateTaggedBlockConfigs(generateTaggedBlockConfig{
series: numSeries,
numPoints: numPoints,
commonTags: commonTags,
blockStart: now.Add(-2 * blockSize),
}),
generateTaggedBlockConfigs(generateTaggedBlockConfig{
series: numSeries,
numPoints: numPoints,
commonTags: commonTags,
blockStart: now.Add(-1 * blockSize),
}),
generateTaggedBlockConfigs(generateTaggedBlockConfig{
series: numSeries,
numPoints: numPoints,
commonTags: commonTags,
blockStart: now,
}),
))
err = writeTestDataToDisk(namesp, setups[0], seriesMaps, 0)
require.NoError(t, err)

// Start the first server with filesystem bootstrapper
require.NoError(t, setups[0].StartServer())

// Start the last server with peers and filesystem bootstrappers
bootstrapStart := time.Now()
require.NoError(t, setups[1].StartServer())
log.Debug("servers are now up")
log.Debug("servers are now up", zap.Duration("took", time.Since(bootstrapStart)))

// Stop the servers
defer func() {
Expand All @@ -111,4 +173,62 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) {
for _, setup := range setups {
verifySeriesMaps(t, setup, namesp.ID(), seriesMaps)
}

// Issue some index queries to the second node which bootstrapped the metadata
session, err := setups[1].M3DBClient().DefaultSession()
require.NoError(t, err)

start := now.Add(-rOpts.RetentionPeriod())
end := now.Add(blockSize)
queryOpts := index.QueryOptions{StartInclusive: start, EndExclusive: end}

// Match on common tags
termQuery := idx.NewTermQuery(commonTags[0].Name.Bytes(), commonTags[0].Value.Bytes())
iter, _, err := session.FetchTaggedIDs(namesp.ID(),
index.Query{Query: termQuery}, queryOpts)
require.NoError(t, err)
defer iter.Finalize()

count := 0
for iter.Next() {
count++
}
require.Equal(t, numSeries, count)
}

type generateTaggedBlockConfig struct {
series int
numPoints int
commonTags []ident.Tag
blockStart time.Time
}

func generateTaggedBlockConfigs(
cfg generateTaggedBlockConfig,
) []generate.BlockConfig {
results := make([]generate.BlockConfig, 0, cfg.series)
for i := 0; i < cfg.series; i++ {
id := fmt.Sprintf("series_%d", i)
tags := make([]ident.Tag, 0, 1+len(cfg.commonTags))
tags = append(tags, ident.Tag{
Name: ident.StringID("series"),
Value: ident.StringID(fmt.Sprintf("%d", i)),
})
tags = append(tags, cfg.commonTags...)
results = append(results, generate.BlockConfig{
IDs: []string{id},
Tags: ident.NewTags(tags...),
NumPoints: cfg.numPoints,
Start: cfg.blockStart,
})
}
return results
}

func blockConfigs(cfgs ...[]generate.BlockConfig) []generate.BlockConfig {
var results []generate.BlockConfig
for _, elem := range cfgs {
results = append(results, elem...)
}
return results
}
47 changes: 32 additions & 15 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var (
// src/cmd/services/m3dbnode/config package if this is changed.
DefaultShardPersistenceConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2))
defaultPersistenceMaxQueueSize = 0
// DefaultShardPersistenceFlushConcurrency controls how many shards in parallel to flush
// for historical data being streamed between peers (historical blocks).
// Update BootstrapPeersConfiguration comment in
// src/cmd/services/m3dbnode/config package if this is changed.
DefaultShardPersistenceFlushConcurrency = 1
)

var (
Expand All @@ -60,26 +65,28 @@ var (
)

type options struct {
resultOpts result.Options
client client.AdminClient
defaultShardConcurrency int
shardPersistenceConcurrency int
persistenceMaxQueueSize int
persistManager persist.Manager
runtimeOptionsManager m3dbruntime.OptionsManager
contextPool context.Pool
fsOpts fs.Options
indexOpts index.Options
compactor *compaction.Compactor
resultOpts result.Options
client client.AdminClient
defaultShardConcurrency int
shardPersistenceConcurrency int
shardPersistenceFlushConcurrency int
persistenceMaxQueueSize int
persistManager persist.Manager
runtimeOptionsManager m3dbruntime.OptionsManager
contextPool context.Pool
fsOpts fs.Options
indexOpts index.Options
compactor *compaction.Compactor
}

// NewOptions creates new bootstrap options.
func NewOptions() Options {
return &options{
resultOpts: result.NewOptions(),
defaultShardConcurrency: DefaultShardConcurrency,
shardPersistenceConcurrency: DefaultShardPersistenceConcurrency,
persistenceMaxQueueSize: defaultPersistenceMaxQueueSize,
resultOpts: result.NewOptions(),
defaultShardConcurrency: DefaultShardConcurrency,
shardPersistenceConcurrency: DefaultShardPersistenceConcurrency,
shardPersistenceFlushConcurrency: DefaultShardPersistenceFlushConcurrency,
persistenceMaxQueueSize: defaultPersistenceMaxQueueSize,
// Use a zero pool, this should be overriden at config time.
contextPool: context.NewPool(context.NewOptions().
SetContextPoolOptions(pool.NewObjectPoolOptions().SetSize(0)).
Expand Down Expand Up @@ -149,6 +156,16 @@ func (o *options) ShardPersistenceConcurrency() int {
return o.shardPersistenceConcurrency
}

func (o *options) SetShardPersistenceFlushConcurrency(value int) Options {
opts := *o
opts.shardPersistenceFlushConcurrency = value
return &opts
}

func (o *options) ShardPersistenceFlushConcurrency() int {
return o.shardPersistenceFlushConcurrency
}

func (o *options) SetPersistenceMaxQueueSize(value int) Options {
opts := *o
opts.persistenceMaxQueueSize = value
Expand Down
Loading

0 comments on commit 2e365ae

Please sign in to comment.