From a90ebaaff6cdf89d840375b5a22606b8985e26bb Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:07:48 +0800 Subject: [PATCH] fix: nil part stats without l2 compaction(#34923) (#34992) related: #34923 Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han Signed-off-by: Sumit Dubey --- .../querynodev2/delegator/segment_pruner.go | 21 +- .../delegator/segment_pruner_test.go | 280 +++++++++++++++--- 2 files changed, 248 insertions(+), 53 deletions(-) diff --git a/internal/querynodev2/delegator/segment_pruner.go b/internal/querynodev2/delegator/segment_pruner.go index 9324226e81a9b..dbb11e9a0daa3 100644 --- a/internal/querynodev2/delegator/segment_pruner.go +++ b/internal/querynodev2/delegator/segment_pruner.go @@ -43,6 +43,9 @@ func PruneSegments(ctx context.Context, ) { _, span := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "segmentPrune") defer span.End() + if partitionStats == nil { + return + } // 1. select collection, partitions and expr clusteringKeyField := clustering.GetClusteringKeyField(schema) if clusteringKeyField == nil { @@ -113,17 +116,21 @@ func PruneSegments(ctx context.Context, targetSegmentIDs := make([]int64, 0, 32) if len(partitionIDs) > 0 { for _, partID := range partitionIDs { - partStats := partitionStats[partID] - for segID, segStat := range partStats.SegmentStats { - targetSegmentIDs = append(targetSegmentIDs, segID) - targetSegmentStats = append(targetSegmentStats, segStat) + partStats, exist := partitionStats[partID] + if exist && partStats != nil { + for segID, segStat := range partStats.SegmentStats { + targetSegmentIDs = append(targetSegmentIDs, segID) + targetSegmentStats = append(targetSegmentStats, segStat) + } } } } else { for _, partStats := range partitionStats { - for segID, segStat := range partStats.SegmentStats { - targetSegmentIDs = append(targetSegmentIDs, segID) - targetSegmentStats = append(targetSegmentStats, segStat) + if partStats != nil { + for segID, segStat := range partStats.SegmentStats { + targetSegmentIDs = append(targetSegmentIDs, segID) + targetSegmentStats = append(targetSegmentStats, segStat) + } } } } diff --git a/internal/querynodev2/delegator/segment_pruner_test.go b/internal/querynodev2/delegator/segment_pruner_test.go index cf925ec2016f7..71b9f81a0954b 100644 --- a/internal/querynodev2/delegator/segment_pruner_test.go +++ b/internal/querynodev2/delegator/segment_pruner_test.go @@ -761,7 +761,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int8 > 128" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -771,13 +771,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int8 < -129" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -787,13 +787,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int8 > 50" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -803,9 +803,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(0, len(sealedSegments[0].Segments)) - sps.Equal(1, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(0, len(testSegments[0].Segments)) + sps.Equal(1, len(testSegments[1].Segments)) } } { @@ -910,7 +910,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int16 > 32768" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -920,13 +920,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int16 < -32769" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -936,13 +936,12 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { - // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int16 > 2550" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -952,9 +951,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(0, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(0, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } } @@ -1060,7 +1059,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int32 > 2147483648" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -1070,13 +1069,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int32 < -2147483649" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -1086,13 +1085,13 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(2, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "int32 > 12550" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -1102,9 +1101,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsVariousIntTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(0, len(sealedSegments[0].Segments)) - sps.Equal(1, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(0, len(testSegments[0].Segments)) + sps.Equal(1, len(testSegments[1].Segments)) } } } @@ -1227,7 +1226,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() { { // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "float > 3.5" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -1237,9 +1236,9 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(0, len(sealedSegments[0].Segments)) - sps.Equal(2, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(0, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } } @@ -1344,8 +1343,7 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() { sealedSegments = append(sealedSegments, item2) { - // test out bound int expr, fallback to common search - testSegments := make([]SnapshotItem, 0) + testSegments := make([]SnapshotItem, len(sealedSegments)) copy(testSegments, sealedSegments) exprStr := "double < -1.5" schemaHelper, _ := typeutil.CreateSchemaHelper(schema) @@ -1355,10 +1353,200 @@ func (sps *SegmentPrunerSuite) TestPruneSegmentsFloatTypes() { queryReq := &internalpb.RetrieveRequest{ SerializedExprPlan: serializedPlan, } - PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, sealedSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) - sps.Equal(1, len(sealedSegments[0].Segments)) - sps.Equal(0, len(sealedSegments[1].Segments)) + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(1, len(testSegments[0].Segments)) + sps.Equal(0, len(testSegments[1].Segments)) + } + } +} + +func (sps *SegmentPrunerSuite) TestPruneSegmentsWithoutPartitionStats() { + paramtable.Init() + collectionName := "test_segment_prune" + primaryFieldName := "pk" + dim := 8 + const FLOAT = "float" + const DOUBLE = "double" + const VEC = "vec" + + fieldName2DataType := make(map[string]schemapb.DataType) + fieldName2DataType[primaryFieldName] = schemapb.DataType_Int64 + fieldName2DataType[FLOAT] = schemapb.DataType_Float + fieldName2DataType[DOUBLE] = schemapb.DataType_Double + fieldName2DataType[VEC] = schemapb.DataType_FloatVector + + // set up segment distribution + sealedSegments := make([]SnapshotItem, 0) + item1 := SnapshotItem{ + NodeID: 1, + Segments: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 1, + SegmentID: 2, + }, + }, + } + item2 := SnapshotItem{ + NodeID: 2, + Segments: []SegmentEntry{ + { + NodeID: 2, + SegmentID: 3, + }, + { + NodeID: 2, + SegmentID: 4, + }, + }, + } + sealedSegments = append(sealedSegments, item1) + sealedSegments = append(sealedSegments, item2) + + clusterFieldName := DOUBLE + schema := testutil.ConstructCollectionSchemaWithKeys(collectionName, + fieldName2DataType, + primaryFieldName, + "", + clusterFieldName, + false, + dim) + { + // test for empty partition stats + partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot) + testSegments := make([]SnapshotItem, len(sealedSegments)) + copy(testSegments, sealedSegments) + exprStr := "double < -1.5" + schemaHelper, _ := typeutil.CreateSchemaHelper(schema) + planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr) + sps.NoError(err) + serializedPlan, _ := proto.Marshal(planNode) + queryReq := &internalpb.RetrieveRequest{ + SerializedExprPlan: serializedPlan, + PartitionIDs: []int64{1}, + } + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + // without valid partition stats, we should get all segments targeted + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) + } + { + // test for nil partition stat + partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot) + partitionStats[1] = nil + testSegments := make([]SnapshotItem, len(sealedSegments)) + copy(testSegments, sealedSegments) + exprStr := "double < -1.5" + schemaHelper, _ := typeutil.CreateSchemaHelper(schema) + planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr) + sps.NoError(err) + serializedPlan, _ := proto.Marshal(planNode) + queryReq := &internalpb.RetrieveRequest{ + SerializedExprPlan: serializedPlan, + PartitionIDs: []int64{1}, + } + PruneSegments(context.TODO(), partitionStats, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + // without valid partition stats, we should get all segments targeted + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) + } + { + // test for nil partition stats map + testSegments := make([]SnapshotItem, len(sealedSegments)) + copy(testSegments, sealedSegments) + exprStr := "double < -1.5" + schemaHelper, _ := typeutil.CreateSchemaHelper(schema) + planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr) + sps.NoError(err) + serializedPlan, _ := proto.Marshal(planNode) + queryReq := &internalpb.RetrieveRequest{ + SerializedExprPlan: serializedPlan, + PartitionIDs: []int64{1}, + } + PruneSegments(context.TODO(), nil, nil, queryReq, schema, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + // without valid partition stats, we should get all segments targeted + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) + } + { + // test for nil schema + var clusteringKeyFieldID int64 = 0 + for _, field := range schema.GetFields() { + if field.IsClusteringKey { + clusteringKeyFieldID = field.FieldID + break + } + } + + // set up part stats + segStats := make(map[UniqueID]storage.SegmentStats) + { + fieldStats := make([]storage.FieldStats, 0) + fieldStat1 := storage.FieldStats{ + FieldID: clusteringKeyFieldID, + Type: schemapb.DataType_Double, + Min: storage.NewDoubleFieldValue(-5.0), + Max: storage.NewDoubleFieldValue(-2.0), + } + fieldStats = append(fieldStats, fieldStat1) + segStats[1] = *storage.NewSegmentStats(fieldStats, 80) + } + + { + fieldStats := make([]storage.FieldStats, 0) + fieldStat1 := storage.FieldStats{ + FieldID: clusteringKeyFieldID, + Type: schemapb.DataType_Double, + Min: storage.NewDoubleFieldValue(-1.0), + Max: storage.NewDoubleFieldValue(2.0), + } + fieldStats = append(fieldStats, fieldStat1) + segStats[2] = *storage.NewSegmentStats(fieldStats, 80) + } + { + fieldStats := make([]storage.FieldStats, 0) + fieldStat1 := storage.FieldStats{ + FieldID: clusteringKeyFieldID, + Type: schemapb.DataType_Double, + Min: storage.NewDoubleFieldValue(3.0), + Max: storage.NewDoubleFieldValue(6.0), + } + fieldStats = append(fieldStats, fieldStat1) + segStats[3] = *storage.NewSegmentStats(fieldStats, 80) + } + { + fieldStats := make([]storage.FieldStats, 0) + fieldStat1 := storage.FieldStats{ + FieldID: clusteringKeyFieldID, + Type: schemapb.DataType_Double, + Min: storage.NewDoubleFieldValue(7.0), + Max: storage.NewDoubleFieldValue(8.0), + } + fieldStats = append(fieldStats, fieldStat1) + segStats[4] = *storage.NewSegmentStats(fieldStats, 80) + } + partitionStats := make(map[UniqueID]*storage.PartitionStatsSnapshot) + targetPartition := int64(1) + partitionStats[targetPartition] = &storage.PartitionStatsSnapshot{ + SegmentStats: segStats, } + testSegments := make([]SnapshotItem, len(sealedSegments)) + copy(testSegments, sealedSegments) + exprStr := "double < -1.5" + schemaHelper, _ := typeutil.CreateSchemaHelper(schema) + planNode, err := planparserv2.CreateRetrievePlan(schemaHelper, exprStr) + sps.NoError(err) + serializedPlan, _ := proto.Marshal(planNode) + queryReq := &internalpb.RetrieveRequest{ + SerializedExprPlan: serializedPlan, + PartitionIDs: []int64{targetPartition}, + } + PruneSegments(context.TODO(), partitionStats, nil, queryReq, nil, testSegments, PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()}) + sps.Equal(2, len(testSegments[0].Segments)) + sps.Equal(2, len(testSegments[1].Segments)) } }