Skip to content

Commit

Permalink
[dbnode] Evict info files cache before index bootstrap in peers boots…
Browse files Browse the repository at this point in the history
…trapper (#2802)
  • Loading branch information
notbdu authored and robskillington committed Oct 28, 2020
1 parent 4460a9b commit 780a769
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 20 deletions.
77 changes: 74 additions & 3 deletions src/dbnode/integration/peers_bootstrap_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import (
"testing"
"time"

indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index"
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/generated/proto/fswriter"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)
Expand All @@ -53,7 +57,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {

idxOpts := namespace.NewIndexOptions().
SetEnabled(true).
SetBlockSize(2 * blockSize)
SetBlockSize(blockSize)
nOpts := namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts)
Expand Down Expand Up @@ -92,7 +96,18 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
Tags: ident.NewTags(ident.StringTag("city", "seattle")),
}

quxSeries := generate.Series{
ID: ident.StringID("qux"),
Tags: ident.NewTags(ident.StringTag("city", "new_orleans")),
}

seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{
IDs: []string{quxSeries.ID.String()},
Tags: quxSeries.Tags,
NumPoints: 100,
Start: now.Add(-2 * blockSize),
},
{
IDs: []string{fooSeries.ID.String()},
Tags: fooSeries.Tags,
Expand Down Expand Up @@ -159,7 +174,7 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{
namespace: ns1.ID(),
exhaustive: true,
expected: []generate.Series{fooSeries, barSeries},
expected: []generate.Series{fooSeries, barSeries, quxSeries},
})

// Match all *e*e*
Expand All @@ -173,6 +188,62 @@ func TestPeersBootstrapIndexWithIndexingEnabled(t *testing.T) {
verifyQueryMetadataResults(t, iter, fetchResponse.Exhaustive, verifyQueryMetadataResultsOptions{
namespace: ns1.ID(),
exhaustive: true,
expected: []generate.Series{barSeries, bazSeries},
expected: []generate.Series{barSeries, bazSeries, quxSeries},
})

// Ensure that the index data for qux has been written to disk.
numDocsPerBlockStart, err := getNumDocsPerBlockStart(
ns1.ID(),
setups[1].FilesystemOpts(),
)
require.NoError(t, err)
numDocs, ok := numDocsPerBlockStart[xtime.ToUnixNano(now.Add(-2*blockSize).Truncate(blockSize))]
require.True(t, ok)
require.Equal(t, numDocs, 1)
}

type indexInfo struct {
Info indexpb.IndexVolumeInfo
VolumeIndex int
}

func getNumDocsPerBlockStart(
nsID ident.ID,
fsOpts fs.Options,
) (map[xtime.UnixNano]int, error) {
numDocsPerBlockStart := make(map[xtime.UnixNano]int)
infoFiles := fs.ReadIndexInfoFiles(
fsOpts.FilePathPrefix(),
nsID,
fsOpts.InfoReaderBufferSize(),
)
// Grab the latest index info file for each blockstart.
latestIndexInfoPerBlockStart := make(map[xtime.UnixNano]indexInfo)
for _, f := range infoFiles {
info, ok := latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)]
if !ok {
latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{
Info: f.Info,
VolumeIndex: f.ID.VolumeIndex,
}
continue
}

if f.ID.VolumeIndex > info.VolumeIndex {
latestIndexInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = indexInfo{
Info: f.Info,
VolumeIndex: f.ID.VolumeIndex,
}
}
}
for blockStart, info := range latestIndexInfoPerBlockStart {
for _, segment := range info.Info.Segments {
metadata := fswriter.Metadata{}
if err := metadata.Unmarshal(segment.Metadata); err != nil {
return nil, err
}
numDocsPerBlockStart[blockStart] += int(metadata.NumDocs)
}
}
return numDocsPerBlockStart, nil
}
12 changes: 12 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (s *peersSource) Read(
continue
}

// NB(bodu): We need to evict the info file cache before reading index data since we've
// maybe fetched blocks from peers so the cached info file state is now stale.
cache.Evict()
r, err := s.readIndex(md,
namespace.IndexRunOptions.ShardTimeRanges,
span,
Expand Down
47 changes: 30 additions & 17 deletions src/dbnode/storage/bootstrap/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ var (
)

type cache struct {
sync.Once
sync.Mutex

fsOpts fs.Options
namespaceDetails []NamespaceDetails
infoFilesByNamespace InfoFilesByNamespace
iOpts instrument.Options
hasPopulatedInfo bool
}

// NewCache creates a cache specifically to be used during the bootstrap process.
Expand All @@ -53,9 +54,10 @@ func NewCache(options CacheOptions) (Cache, error) {
return nil, err
}
return &cache{
fsOpts: options.FilesystemOptions(),
namespaceDetails: options.NamespaceDetails(),
iOpts: options.InstrumentOptions(),
fsOpts: options.FilesystemOptions(),
namespaceDetails: options.NamespaceDetails(),
infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())),
iOpts: options.InstrumentOptions(),
}, nil
}

Expand Down Expand Up @@ -83,22 +85,33 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea
return infoFileResults, nil
}

func (c *cache) Evict() {
c.Lock()
defer c.Unlock()
c.hasPopulatedInfo = false
}

func (c *cache) ReadInfoFiles() InfoFilesByNamespace {
c.Once.Do(func() {
c.infoFilesByNamespace = make(InfoFilesByNamespace, len(c.namespaceDetails))
for _, finder := range c.namespaceDetails {
result := make(InfoFileResultsPerShard, len(finder.Shards))
for _, shard := range finder.Shards {
result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(),
finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(),
persist.FileSetFlushType)
}

c.infoFilesByNamespace[finder.Namespace] = result
c.Lock()
defer c.Unlock()
if !c.hasPopulatedInfo {
c.populateInfoFilesByNamespaceWithLock()
c.hasPopulatedInfo = true
}
return c.infoFilesByNamespace
}

func (c *cache) populateInfoFilesByNamespaceWithLock() {
for _, finder := range c.namespaceDetails {
result := make(InfoFileResultsPerShard, len(finder.Shards))
for _, shard := range finder.Shards {
result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(),
finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(),
persist.FileSetFlushType)
}
})

return c.infoFilesByNamespace
c.infoFilesByNamespace[finder.Namespace] = result
}
}

type cacheOptions struct {
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/bootstrap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ type Cache interface {
// ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy
// is returned if the info files have already been read.
ReadInfoFiles() InfoFilesByNamespace

// Evict cache contents by re-reading fresh data in.
Evict()
}

// CacheOptions represents the options for Cache.
Expand Down

0 comments on commit 780a769

Please sign in to comment.