Skip to content

Commit

Permalink
Add load test and inject testing flags (#204)
Browse files Browse the repository at this point in the history
* Add a slow load test case to CI
* Update max opened blocks strategy. A shard could open 64 blocks
  at most. A cleanup goroutine could collect 10 inactive oldest blocks
   every minute until there are 4 blocks left in the cache.
* Inject logging level and the timeout of eventually through "ldflags"

Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily authored Nov 4, 2022
1 parent 1edf859 commit 28b5fd5
Show file tree
Hide file tree
Showing 41 changed files with 730 additions and 218 deletions.
57 changes: 57 additions & 0 deletions .github/workflows/load.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Continuous Integration

on:
schedule:
- cron: '0 20 * * *'

jobs:
test:
name: Load several days data
runs-on: ubuntu-20.04
strategy:
matrix:
tz: ["UTC", "Asia/Shanghai", "America/Los_Angeles"]
steps:
- name: Set timezone
run: sudo timedatectl set-timezone ${{ matrix.tz }}
- uses: actions/setup-node@v3
with:
node-version: 16.15
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.19
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- uses: actions/cache@v3
id: cache-go
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Update dependencies
if: steps.cache-go.outputs.cache-hit != 'true'
run: GOPROXY=https://proxy.golang.org go mod download
- name: Generate mocks
run: make generate
- name: Test
run: TEST_EXTRA_OPTS="--label-filter slow" make -C test test
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy
include scripts/build/ginkgo.mk

test-ci: $(GINKGO) ## Run the unit tests in CI
$(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./...
$(GINKGO) --race \
-ldflags \
"-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=warn" \
--cover --covermode atomic --coverprofile=coverage.out \
--label-filter !slow \
./...

##@ Code quality targets

Expand Down
3 changes: 2 additions & 1 deletion banyand/liaison/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)

func TestGrpc(t *testing.T) {
Expand All @@ -34,6 +35,6 @@ func TestGrpc(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
Level: flags.LogLevel,
})).Should(Succeed())
})
3 changes: 2 additions & 1 deletion banyand/measure/measure_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
)

Expand All @@ -45,7 +46,7 @@ func TestMeasure(t *testing.T) {
var _ = ginkgo.BeforeSuite(func() {
gomega.Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
Level: flags.LogLevel,
})).To(gomega.Succeed())
})

Expand Down
4 changes: 3 additions & 1 deletion banyand/measure/measure_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
if err != nil {
return err
}
span, err := series.Create(eventTime)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
span, err := series.Create(ctx, eventTime)
if err != nil {
if span != nil {
_ = span.Close()
Expand Down
6 changes: 5 additions & 1 deletion banyand/measure/measure_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package measure

import (
"bytes"
"context"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -75,7 +77,9 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if err != nil {
return err
}
wp, err := series.Create(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
Expand Down
3 changes: 2 additions & 1 deletion banyand/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
test "github.com/apache/skywalking-banyandb/pkg/test/stream"
)

Expand All @@ -37,7 +38,7 @@ func Test_service_RulesBySubject(t *testing.T) {
is := assert.New(t)
is.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
Level: flags.LogLevel,
}))
ctx := context.TODO()
s, _ := NewService(ctx)
Expand Down
3 changes: 2 additions & 1 deletion banyand/metadata/schema/schema_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
. "github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)

func TestSchema(t *testing.T) {
Expand All @@ -35,6 +36,6 @@ func TestSchema(t *testing.T) {
var _ = ginkgo.BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
Level: flags.LogLevel,
})).To(Succeed())
})
4 changes: 3 additions & 1 deletion banyand/query/processor_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func familyIdentity(name string, flag []byte) []byte {
}

func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.TopNRequest) ([]tsdb.Iterator, error) {
seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRange(
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange(
request.GetTimeRange().GetBegin().AsTime(),
request.GetTimeRange().GetEnd().AsTime()),
)
Expand Down
3 changes: 2 additions & 1 deletion banyand/stream/stream_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
)

Expand All @@ -44,7 +45,7 @@ func TestStream(t *testing.T) {
var _ = BeforeSuite(func() {
Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
Level: flags.LogLevel,
})).To(Succeed())
})

Expand Down
7 changes: 6 additions & 1 deletion banyand/stream/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package stream

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -87,7 +90,9 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
return err
}
t := timestamp.MToN(tp)
wp, err := series.Create(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
Expand Down
81 changes: 54 additions & 27 deletions banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
Expand All @@ -49,8 +50,11 @@ const (
componentSecondLSMIdx = "lsm"

defaultMainMemorySize = 8 << 20
defaultEnqueueTimeout = 500 * time.Millisecond
)

var ErrBlockClosingInterrupted = errors.New("interrupt to close the block")

type block struct {
path string
l *logger.Logger
Expand Down Expand Up @@ -106,13 +110,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
deleted: &atomic.Bool{},
queue: opts.queue,
}
b.closed.Store(true)
b.options(ctx)
position := ctx.Value(common.PositionKey)
if position != nil {
b.position = position.(common.Position)
}

return b, b.open()
return b, nil
}

func (b *block) options(ctx context.Context) {
Expand Down Expand Up @@ -187,24 +192,28 @@ func (b *block) open() (err error) {
return nil
}

func (b *block) delegate() (BlockDelegate, error) {
func (b *block) delegate(ctx context.Context) (BlockDelegate, error) {
if b.deleted.Load() {
return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is deleted", b.blockID)
return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is deleted", b)
}
blockID := BlockID{
BlockID: b.blockID,
SegID: b.segID,
}
if b.incRef() {
b.queue.Touch(blockID)
return &bDelegate{
delegate: b,
}, nil
}
b.lock.Lock()
defer b.lock.Unlock()
b.queue.Push(BlockID{
BlockID: b.blockID,
SegID: b.segID,
})
// TODO: remove the block which fails to open from the queue
err := b.open()
if err != nil {
if err := b.queue.Push(ctx, blockID, func() error {
if !b.Closed() {
return nil
}
return b.open()
}); err != nil {
b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block")
return nil, err
}
Expand Down Expand Up @@ -240,14 +249,23 @@ loop:
goto loop
}

func (b *block) waitDone() {
loop:
if b.ref.Load() < 1 {
b.ref.Store(0)
return
}
runtime.Gosched()
goto loop
func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
ch := make(chan struct{})
go func() {
loop:
if b.ref.Load() < 1 {
b.ref.Store(0)
close(ch)
return
}
if stopped.Load() {
close(ch)
return
}
runtime.Gosched()
goto loop
}()
return ch
}

func (b *block) flush() {
Expand All @@ -261,36 +279,45 @@ func (b *block) flush() {
}
}

func (b *block) close() {
func (b *block) close(ctx context.Context) (err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed.Load() {
return
return nil
}
b.closed.Store(true)
b.waitDone()
stopWaiting := &atomic.Bool{}
ch := b.waitDone(stopWaiting)
select {
case <-ctx.Done():
b.closed.Store(false)
stopWaiting.Store(true)
return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
case <-ch:
}
for _, closer := range b.closableLst {
_ = closer.Close()
err = multierr.Append(err, closer.Close())
}
close(b.stopCh)
return err
}

func (b *block) stopThenClose() {
func (b *block) stopThenClose(ctx context.Context) error {
if b.Reporter != nil {
b.Stop()
}
b.close()
return b.close(ctx)
}

func (b *block) delete() error {
func (b *block) delete(ctx context.Context) error {
if b.deleted.Load() {
return nil
}
b.deleted.Store(true)
if b.Reporter != nil {
b.Stop()
}
b.close()
b.close(ctx)
return os.RemoveAll(b.path)
}

Expand All @@ -299,7 +326,7 @@ func (b *block) Closed() bool {
}

func (b *block) String() string {
return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), parseSuffix(b.blockID))
}

func (b *block) stats() (names []string, stats []observability.Statistics) {
Expand Down
Loading

0 comments on commit 28b5fd5

Please sign in to comment.