Skip to content

Commit

Permalink
[dbnode] Add AggregateTilesOptions.Process field (#3737)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Sep 10, 2021
1 parent 4622985 commit 6b5c855
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (s *service) aggregateTiles(
opts, err := storage.NewAggregateTilesOptions(
start, end, step,
sourceNsID,
storage.AggregateTilesAPI,
s.opts.InstrumentOptions())
if err != nil {
return 0, tterrors.NewBadRequestError(err)
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ func NewAggregateTilesOptions(
start, end xtime.UnixNano,
step time.Duration,
targetNsID ident.ID,
process AggregateTilesProcess,
insOpts instrument.Options,
) (AggregateTilesOptions, error) {
if !end.After(start) {
Expand All @@ -1372,12 +1373,16 @@ func NewAggregateTilesOptions(
}

scope := insOpts.MetricsScope().SubScope("computed-namespace")
insOpts = insOpts.SetMetricsScope(scope.Tagged(map[string]string{"target-namespace": targetNsID.String()}))
insOpts = insOpts.SetMetricsScope(scope.Tagged(map[string]string{
"target-namespace": targetNsID.String(),
"process": process.String(),
}))

return AggregateTilesOptions{
Start: start,
End: end,
Step: step,
Process: process,
InsOptions: insOpts,
}, nil
}
23 changes: 14 additions & 9 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,9 +1488,11 @@ func TestDatabaseAggregateTiles(t *testing.T) {
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
start = xtime.Now().Truncate(time.Hour)
process = AggregateTilesAPI
)

opts, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, targetNsID, d.opts.InstrumentOptions())
opts, err := NewAggregateTilesOptions(
start, start.Add(-time.Second), time.Minute, targetNsID, process, d.opts.InstrumentOptions())
require.Error(t, err)
opts.InsOptions = d.opts.InstrumentOptions()

Expand All @@ -1504,22 +1506,25 @@ func TestDatabaseAggregateTiles(t *testing.T) {
}

func TestNewAggregateTilesOptions(t *testing.T) {
start := xtime.Now().Truncate(time.Hour)
targetNs := ident.StringID("target")
insOpts := instrument.NewOptions()
var (
start = xtime.Now().Truncate(time.Hour)
targetNs = ident.StringID("target")
insOpts = instrument.NewOptions()
process = AggregateTilesRegular
)

_, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, targetNs, insOpts)
_, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, targetNs, process, insOpts)
assert.Error(t, err)

_, err = NewAggregateTilesOptions(start, start, time.Minute, targetNs, insOpts)
_, err = NewAggregateTilesOptions(start, start, time.Minute, targetNs, process, insOpts)
assert.Error(t, err)

_, err = NewAggregateTilesOptions(start, start.Add(time.Second), -time.Minute, targetNs, insOpts)
_, err = NewAggregateTilesOptions(start, start.Add(time.Second), -time.Minute, targetNs, process, insOpts)
assert.Error(t, err)

_, err = NewAggregateTilesOptions(start, start.Add(time.Second), 0, targetNs, insOpts)
_, err = NewAggregateTilesOptions(start, start.Add(time.Second), 0, targetNs, process, insOpts)
assert.Error(t, err)

_, err = NewAggregateTilesOptions(start, start.Add(time.Second), time.Minute, targetNs, insOpts)
_, err = NewAggregateTilesOptions(start, start.Add(time.Second), time.Minute, targetNs, process, insOpts)
assert.NoError(t, err)
}
9 changes: 3 additions & 6 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"fmt"
"math"
"runtime"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -1829,9 +1828,7 @@ func (n *dbNamespace) aggregateTiles(
sourceBlockSize = sourceNs.Options().RetentionOptions().BlockSize()
lastSourceBlockEnd = opts.End.Truncate(sourceBlockSize)

scope = opts.InsOptions.MetricsScope().Tagged(map[string]string{
"backfill": strconv.FormatBool(opts.Backfill),
})
scope = opts.InsOptions.MetricsScope()
processedShards = scope.Counter("processed-shards")
)

Expand Down Expand Up @@ -1892,8 +1889,8 @@ func (n *dbNamespace) aggregateTiles(
}

n.log.Info("finished large tiles aggregation for namespace",
zap.String("sourceNs", sourceNs.ID().String()),
zap.Bool("backfill", opts.Backfill),
zap.Stringer("sourceNs", sourceNs.ID()),
zap.Stringer("process", opts.Process),
zap.Time("targetBlockStart", targetBlockStart.ToTime()),
zap.Time("lastSourceBlockEnd", lastSourceBlockEnd.ToTime()),
zap.Duration("step", opts.Step),
Expand Down
8 changes: 6 additions & 2 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,9 +1616,11 @@ func TestNamespaceAggregateTiles(t *testing.T) {
shard0ID = uint32(10)
shard1ID = uint32(20)
insOpts = instrument.NewOptions()
process = AggregateTilesRegular
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
opts, err := NewAggregateTilesOptions(
start, start.Add(targetBlockSize), time.Second, targetNsID, process, insOpts)
require.NoError(t, err)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
Expand Down Expand Up @@ -1680,9 +1682,11 @@ func TestNamespaceAggregateTilesShipBootstrappingShards(t *testing.T) {
targetBlockSize = 2 * time.Hour
start = xtime.Now().Truncate(targetBlockSize)
insOpts = instrument.NewOptions()
process = AggregateTilesRegular
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
opts, err := NewAggregateTilesOptions(
start, start.Add(targetBlockSize), time.Second, targetNsID, process, insOpts)
require.NoError(t, err)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
Expand Down
30 changes: 29 additions & 1 deletion src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package storage

import (
"bytes"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -1486,14 +1487,41 @@ type newFSMergeWithMemFn func(
// NewBackgroundProcessFn is a function that creates and returns a new BackgroundProcess.
type NewBackgroundProcessFn func(Database, Options) (BackgroundProcess, error)

// AggregateTilesProcess identifies the process used for the aggregation.
type AggregateTilesProcess uint8

const (
// AggregateTilesRegular indicates regular process.
AggregateTilesRegular AggregateTilesProcess = iota

// AggregateTilesBackfill indicates backfill.
AggregateTilesBackfill

// AggregateTilesAPI indicates invocation via API call.
AggregateTilesAPI
)

func (p AggregateTilesProcess) String() string {
switch p {
case AggregateTilesRegular:
return "regular"
case AggregateTilesBackfill:
return "backfill"
case AggregateTilesAPI:
return "api"
default:
return fmt.Sprintf("unknown (%d)", p)
}
}

// AggregateTilesOptions is the options for large tile aggregation.
type AggregateTilesOptions struct {
// Start and End specify the aggregation window.
Start, End xtime.UnixNano
// Step is the downsampling step.
Step time.Duration
InsOptions instrument.Options
Backfill bool
Process AggregateTilesProcess
}

// TileAggregator is the interface for AggregateTiles.
Expand Down

0 comments on commit 6b5c855

Please sign in to comment.