Skip to content

Commit

Permalink
[query] Fix graphite functions, aggregation bug (#2549)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Aug 23, 2020
1 parent 69a7bdc commit ef327ee
Show file tree
Hide file tree
Showing 14 changed files with 788 additions and 105 deletions.
157 changes: 155 additions & 2 deletions src/aggregator/aggregator/aggregator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/aggregator/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// THE SOFTWARE.

// mockgen rules for generating mocks for exported interfaces (reflection mode).
//go:generate sh -c "mockgen -package=aggregator github.com/m3db/m3/src/aggregator/aggregator ElectionManager,FlushTimesManager,PlacementManager | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/aggregator_mock.go"
//go:generate sh -c "mockgen -package=aggregator github.com/m3db/m3/src/aggregator/aggregator Aggregator,ElectionManager,FlushTimesManager,PlacementManager | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/aggregator_mock.go"
//go:generate sh -c "mockgen -package=client github.com/m3db/m3/src/aggregator/client Client,AdminClient | genclean -pkg github.com/m3db/m3/src/aggregator/client -out $GOPATH/src/github.com/m3db/m3/src/aggregator/client/client_mock.go"
//go:generate sh -c "mockgen -package=handler github.com/m3db/m3/src/aggregator/aggregator/handler Handler | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator/handler -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/handler/handler_mock.go"
//go:generate sh -c "mockgen -package=runtime github.com/m3db/m3/src/aggregator/runtime OptionsWatcher | genclean -pkg github.com/m3db/m3/src/aggregator/runtime -out $GOPATH/src/github.com/m3db/m3/src/aggregator/runtime/runtime_mock.go"
Expand Down
108 changes: 107 additions & 1 deletion src/cmd/services/m3coordinator/downsample/flush_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ package downsample

import (
"bytes"
"sync"
"testing"

"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/mock"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"
xtest "github.com/m3db/m3/src/x/test"
Expand All @@ -41,7 +44,7 @@ import (
)

func TestDownsamplerFlushHandlerCopiesTags(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
defer ctrl.Finish()

store := mock.NewMockStorage()
Expand Down Expand Up @@ -104,3 +107,106 @@ func TestDownsamplerFlushHandlerCopiesTags(t *testing.T) {
assert.False(t, xtest.ByteSlicesBackedBySameData(tagName, tag.Name))
assert.False(t, xtest.ByteSlicesBackedBySameData(tagValue, tag.Value))
}

func graphiteTags(
t *testing.T, first string, encPool serialize.TagEncoderPool) []byte {
enc := encPool.Get()
defer enc.Finalize()

err := enc.Encode(ident.MustNewTagStringsIterator(
"__g0__", first,
"__g1__", "y",
"__g2__", "z",
string(MetricsOptionIDSchemeTagName), string(GraphiteIDSchemeTagValue),
))

require.NoError(t, err)
data, ok := enc.Data()
require.True(t, ok)
return append(make([]byte, 0, data.Len()), data.Bytes()...)
}

func TestDownsamplerFlushHandlerHighConcurrencyNoTagMixing(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

store := mock.NewMockStorage()

size := 10
decodeOpts := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{
CheckBytesWrapperPoolSize: &size,
})

poolOpts := pool.NewObjectPoolOptions()
tagDecoderPool := serialize.NewTagDecoderPool(decodeOpts, poolOpts)
tagDecoderPool.Init()

pool := serialize.NewMetricTagsIteratorPool(tagDecoderPool, poolOpts)
pool.Init()

workers := xsync.NewWorkerPool(1)
workers.Init()

instrumentOpts := instrument.NewOptions()

handler := newDownsamplerFlushHandler(store, pool,
workers, models.NewTagOptions(), instrumentOpts)
writer, err := handler.NewWriter(tally.NoopScope)
require.NoError(t, err)

encodeOpts := serialize.NewTagEncoderOptions()
encPool := serialize.NewTagEncoderPool(encodeOpts, poolOpts)
encPool.Init()

xBytes := graphiteTags(t, "x", encPool)
fooBytes := graphiteTags(t, "foo", encPool)

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
xData := append(make([]byte, 0, len(xBytes)), xBytes...)
fooData := append(make([]byte, 0, len(fooBytes)), fooBytes...)
go func() {
defer wg.Done()
err := writer.Write(aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: id.ChunkedID{Data: xData},
TimeNanos: 123,
Value: 42.42,
},
StoragePolicy: policy.MustParseStoragePolicy("1s:1d"),
})
require.NoError(t, err)

err = writer.Write(aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: id.ChunkedID{Data: fooData},
TimeNanos: 123,
Value: 42.42,
},
StoragePolicy: policy.MustParseStoragePolicy("1s:1d"),
})
require.NoError(t, err)
}()
}

wg.Wait()
// Wait for flush
err = writer.Flush()
require.NoError(t, err)

// Inspect the write
writes := store.Writes()
require.Equal(t, 200, len(writes))

seenMap := make(map[string]int, 10)
for _, w := range writes {
str := w.Tags().String()
seenMap[str] = seenMap[str] + 1
}

assert.Equal(t, map[string]int{
"__g0__: foo, __g1__: y, __g2__: z": 100,
"__g0__: x, __g1__: y, __g2__: z": 100,
}, seenMap)
}
7 changes: 4 additions & 3 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func newMetricsAppenderPool(opts pool.ObjectPoolOptions) *metricsAppenderPool {
}

func (p *metricsAppenderPool) Get() *metricsAppender {
return p.pool.Get().(*metricsAppender)
appender := p.pool.Get().(*metricsAppender)
// NB: reset appender.
appender.NextMetric()
return appender
}

func (p *metricsAppenderPool) Put(v *metricsAppender) {
Expand Down Expand Up @@ -177,7 +180,6 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp

a.multiSamplesAppender.reset()
unownedID := data.Bytes()

// Match policies and rollups and build samples appender
id := a.metricTagsIteratorPool.Get()
id.Reset(unownedID)
Expand Down Expand Up @@ -353,7 +355,6 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp

a.debugLogMatch("downsampler applying matched rollup rule",
debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID})

a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
Expand Down
Loading

0 comments on commit ef327ee

Please sign in to comment.