Skip to content

Commit

Permalink
feat: skip some empty ttMsg in Datanode flowgraph (#28756)
Browse files Browse the repository at this point in the history
/kind feature

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Dec 6, 2023
1 parent fb089cd commit 6736f65
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 12 deletions.
6 changes: 6 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ dataNode:
maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
maxParallelSyncTaskNum: 6 # Maximum number of sync tasks executed in parallel in each flush manager
skipMode:
# when there are only timetick msg in flowgraph for a while (longer than coldTime),
# flowGraph will turn on skip mode to skip most timeticks to reduce cost, especially there are a lot of channels
enable: true
skipNum: 4
coldTime: 60
segment:
insertBufSize: 16777216 # Max buffer size to flush for a single segment.
deleteBufBytes: 67108864 # Max buffer size to flush del for a single channel
Expand Down
61 changes: 49 additions & 12 deletions internal/util/flowgraph/input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package flowgraph
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -49,6 +52,10 @@ type InputNode struct {
dataType string

closeGracefully *atomic.Bool

skipMode bool
skipCount int
lastNotTimetickTime time.Time
}

// IsInputNode returns whether Node is InputNode
Expand Down Expand Up @@ -129,6 +136,11 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
}

var spans []trace.Span
defer func() {
for _, span := range spans {
span.End()
}
}()
for _, msg := range msgPack.Msgs {
ctx := msg.TraceCtx()
if ctx == nil {
Expand All @@ -140,6 +152,33 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
msg.SetTraceCtx(ctx)
}

// skip timetick message feature
if inNode.role == typeutil.DataNodeRole &&
len(msgPack.Msgs) > 0 &&
paramtable.Get().DataNodeCfg.FlowGraphSkipModeEnable.GetAsBool() {
if msgPack.Msgs[0].Type() == commonpb.MsgType_TimeTick {
if inNode.skipMode {
// if empty timetick message and in skipMode, will skip some of the timetick messages to reduce downstream work
if inNode.skipCount == paramtable.Get().DataNodeCfg.FlowGraphSkipModeSkipNum.GetAsInt() {
inNode.skipCount = 0
} else {
inNode.skipCount = inNode.skipCount + 1
return []Msg{}
}
} else {
cd := paramtable.Get().DataNodeCfg.FlowGraphSkipModeColdTime.GetAsInt()
if time.Since(inNode.lastNotTimetickTime) > time.Second*time.Duration(cd) {
inNode.skipMode = true
}
}
} else {
// if non empty message, refresh the lastNotTimetickTime and close skip mode
inNode.skipMode = false
inNode.skipCount = 0
inNode.lastNotTimetickTime = time.Now()
}
}

var msgStreamMsg Msg = &MsgStreamMsg{
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
Expand All @@ -148,10 +187,6 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
endPositions: msgPack.EndPositions,
}

for _, span := range spans {
span.End()
}

return []Msg{msgStreamMsg}
}

Expand All @@ -162,13 +197,15 @@ func NewInputNode(input <-chan *msgstream.MsgPack, nodeName string, maxQueueLeng
baseNode.SetMaxParallelism(maxParallelism)

return &InputNode{
BaseNode: baseNode,
input: input,
name: nodeName,
role: role,
nodeID: nodeID,
collectionID: collectionID,
dataType: dataType,
closeGracefully: atomic.NewBool(CloseImmediately),
BaseNode: baseNode,
input: input,
name: nodeName,
role: role,
nodeID: nodeID,
collectionID: collectionID,
dataType: dataType,
closeGracefully: atomic.NewBool(CloseImmediately),
skipCount: 0,
lastNotTimetickTime: time.Now(),
}
}
85 changes: 85 additions & 0 deletions internal/util/flowgraph/input_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ package flowgraph

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestInputNode(t *testing.T) {
Expand Down Expand Up @@ -66,3 +72,82 @@ func Test_NewInputNode(t *testing.T) {
assert.Equal(t, node.maxQueueLength, maxQueueLength)
assert.Equal(t, node.maxParallelism, maxParallelism)
}

func Test_InputNodeSkipMode(t *testing.T) {
t.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/Test_InputNodeSkipMode")
factory := dependency.NewDefaultFactory(true)
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlowGraphSkipModeColdTime.Key, "3")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlowGraphSkipModeSkipNum.Key, "1")

msgStream, _ := factory.NewMsgStream(context.TODO())
channels := []string{"cc" + fmt.Sprint(rand.Int())}
msgStream.AsConsumer(context.Background(), channels, "sub", mqwrapper.SubscriptionPositionEarliest)

produceStream, _ := factory.NewMsgStream(context.TODO())
produceStream.AsProducer(channels)
closeCh := make(chan struct{})
outputCh := make(chan bool)

nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
defer inputNode.Close()

outputCount := 0
go func() {
for {
select {
case <-closeCh:
return
default:
output := inputNode.Operate(nil)
if len(output) > 0 {
outputCount = outputCount + 1
}
outputCh <- true
}
}
}()
defer close(closeCh)

msgPack := generateMsgPack()
produceStream.Produce(&msgPack)
log.Info("produce empty ttmsg")
<-outputCh
assert.Equal(t, 1, outputCount)
assert.Equal(t, false, inputNode.skipMode)

time.Sleep(3 * time.Second)
assert.Equal(t, false, inputNode.skipMode)
produceStream.Produce(&msgPack)
log.Info("after 3 seconds with no active msg receive, input node will turn on skip mode")
<-outputCh
assert.Equal(t, 2, outputCount)
assert.Equal(t, true, inputNode.skipMode)

log.Info("some ttmsg will be skipped in skip mode")
// this msg will be skipped
produceStream.Produce(&msgPack)
<-outputCh
assert.Equal(t, 2, outputCount)
assert.Equal(t, true, inputNode.skipMode)

// this msg will be consumed
produceStream.Produce(&msgPack)
<-outputCh
assert.Equal(t, 3, outputCount)
assert.Equal(t, true, inputNode.skipMode)

//log.Info("non empty msg will awake input node, turn off skip mode")
//insertMsgPack := generateInsertMsgPack()
//produceStream.Produce(&insertMsgPack)
//<-outputCh
//assert.Equal(t, 3, outputCount)
//assert.Equal(t, false, inputNode.skipMode)
//
//log.Info("empty msg will be consumed in not-skip mode")
//produceStream.Produce(&msgPack)
//<-outputCh
//assert.Equal(t, 4, outputCount)
//assert.Equal(t, false, inputNode.skipMode)
//close(closeCh)
}
16 changes: 16 additions & 0 deletions internal/util/flowgraph/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ func generateMsgPack() msgstream.MsgPack {
return msgPack
}

func generateInsertMsgPack() msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
insertMsg := &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: uint64(time.Now().Unix()),
EndTimestamp: uint64(time.Now().Unix() + 1),
HashValues: []uint32{0},
},
InsertRequest: msgpb.InsertRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert},
},
}
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
return msgPack
}

func TestNodeManager_Start(t *testing.T) {
t.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart")
factory := dependency.NewDefaultFactory(true)
Expand Down
35 changes: 35 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,11 @@ type dataNodeConfig struct {
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
MaxParallelSyncTaskNum ParamItem `refreshable:"false"`

// skip mode
FlowGraphSkipModeEnable ParamItem `refreshable:"true"`
FlowGraphSkipModeSkipNum ParamItem `refreshable:"true"`
FlowGraphSkipModeColdTime ParamItem `refreshable:"true"`

// segment
FlushInsertBufferSize ParamItem `refreshable:"true"`
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -2612,6 +2617,36 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.FlowGraphMaxParallelism.Init(base.mgr)

p.FlowGraphSkipModeEnable = ParamItem{
Key: "datanode.dataSync.skipMode.enable",
Version: "2.3.4",
DefaultValue: "true",
PanicIfEmpty: false,
Doc: "Support skip some timetick message to reduce CPU usage",
Export: true,
}
p.FlowGraphSkipModeEnable.Init(base.mgr)

p.FlowGraphSkipModeSkipNum = ParamItem{
Key: "datanode.dataSync.skipMode.skipNum",
Version: "2.3.4",
DefaultValue: "5",
PanicIfEmpty: false,
Doc: "Consume one for every n records skipped",
Export: true,
}
p.FlowGraphSkipModeSkipNum.Init(base.mgr)

p.FlowGraphSkipModeColdTime = ParamItem{
Key: "datanode.dataSync.skipMode.coldTime",
Version: "2.3.4",
DefaultValue: "60",
PanicIfEmpty: false,
Doc: "Turn on skip mode after there are only timetick msg for x seconds",
Export: true,
}
p.FlowGraphSkipModeColdTime.Init(base.mgr)

p.MaxParallelSyncTaskNum = ParamItem{
Key: "dataNode.dataSync.maxParallelSyncTaskNum",
Version: "2.3.0",
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ func TestComponentParam(t *testing.T) {
maxParallelism := Params.FlowGraphMaxParallelism.GetAsInt()
t.Logf("flowGraphMaxParallelism: %d", maxParallelism)

flowGraphSkipModeEnable := Params.FlowGraphSkipModeEnable.GetAsBool()
t.Logf("flowGraphSkipModeEnable: %t", flowGraphSkipModeEnable)

flowGraphSkipModeSkipNum := Params.FlowGraphSkipModeSkipNum.GetAsInt()
t.Logf("flowGraphSkipModeSkipNum: %d", flowGraphSkipModeSkipNum)

flowGraphSkipModeColdTime := Params.FlowGraphSkipModeColdTime.GetAsInt()
t.Logf("flowGraphSkipModeColdTime: %d", flowGraphSkipModeColdTime)

maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt()
t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum)

Expand Down

0 comments on commit 6736f65

Please sign in to comment.