From 5d8a979853c972a0b78ce55b7a5e4ea3645a93c4 Mon Sep 17 00:00:00 2001 From: OxalisCu <64067746+OxalisCu@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:17:07 +0800 Subject: [PATCH] enhance: Bulkinsert supports null in csv formats (#35912) see details in this issue https://github.com/milvus-io/milvus/issues/35911 --------- Signed-off-by: OxalisCu <2127298698@qq.com> --- internal/util/importutilv2/csv/reader.go | 4 +- internal/util/importutilv2/csv/reader_test.go | 95 ++++++++++++------- internal/util/importutilv2/csv/row_parser.go | 51 +++++++++- .../util/importutilv2/csv/row_parser_test.go | 67 ++++++++++++- internal/util/importutilv2/option.go | 9 ++ internal/util/importutilv2/reader.go | 6 +- internal/util/testutil/test_util.go | 7 +- tests/integration/import/util_test.go | 6 +- 8 files changed, 200 insertions(+), 45 deletions(-) diff --git a/internal/util/importutilv2/csv/reader.go b/internal/util/importutilv2/csv/reader.go index a0f6c6c1a3794..f216f10d9f4f9 100644 --- a/internal/util/importutilv2/csv/reader.go +++ b/internal/util/importutilv2/csv/reader.go @@ -31,7 +31,7 @@ type reader struct { filePath string } -func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune) (*reader, error) { +func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune, nullkey string) (*reader, error) { cmReader, err := cm.Reader(ctx, path) if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error())) @@ -53,7 +53,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read csv header, error: %v", err)) } - rowParser, err := NewRowParser(schema, header) + rowParser, err := NewRowParser(schema, header, nullkey) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index 9f448ac4af0b1..67e29256e0b1c 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -8,7 +8,6 @@ import ( "os" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -37,7 +36,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -74,25 +73,31 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "128", }, }, + Nullable: nullable, }, }, } + // config + // csv separator + sep := ',' + // csv writer write null value as empty string + nullkey := "" + // generate csv data insertData, err := testutil.CreateInsertData(schema, suite.numRows) suite.NoError(err) - csvData, err := testutil.CreateInsertDataForCSV(schema, insertData) + csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey) suite.NoError(err) // write to csv file - sep := '\t' filePath := fmt.Sprintf("/tmp/test_%d_reader.csv", rand.Int()) - defer os.Remove(filePath) + // defer os.Remove(filePath) wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) - assert.NoError(suite.T(), err) + suite.NoError(err) writer := csv.NewWriter(wf) writer.Comma = sep - writer.WriteAll(csvData) + err = writer.WriteAll(csvData) suite.NoError(err) // read from csv file @@ -102,13 +107,13 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data suite.NoError(err) // check reader separate fields by '\t' - wrongSep := ',' - _, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep) + wrongSep := '\t' + _, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep, nullkey) suite.Error(err) suite.Contains(err.Error(), "value of field is missed: ") // check data - reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep) + reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep, nullkey) suite.NoError(err) checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { @@ -129,43 +134,63 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool, schemapb.DataType_None) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None) - suite.run(schemapb.DataType_Float, schemapb.DataType_None) - suite.run(schemapb.DataType_Double, schemapb.DataType_None) - suite.run(schemapb.DataType_String, schemapb.DataType_None) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None) - - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double) - suite.run(schemapb.DataType_Array, schemapb.DataType_String) + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, false) + suite.run(schemapb.DataType_String, schemapb.DataType_None, false) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, false) + + suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Float, schemapb.DataType_None, true) + suite.run(schemapb.DataType_Double, schemapb.DataType_None, true) + suite.run(schemapb.DataType_String, schemapb.DataType_None, true) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true) + suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) } func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func (suite *ReaderSuite) TestVector() { suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 78fcdb3d248fb..c87b0399f5b2e 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -17,6 +17,7 @@ type RowParser interface { Parse(raw []string) (Row, error) } type rowParser struct { + nullkey string header []string name2Dim map[string]int name2Field map[string]*schemapb.FieldSchema @@ -24,7 +25,7 @@ type rowParser struct { dynamicField *schemapb.FieldSchema } -func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser, error) { +func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) { name2Field := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) string { return field.GetName() @@ -74,6 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser } return &rowParser{ + nullkey: nullkey, name2Dim: name2Dim, header: header, name2Field: name2Field, @@ -157,52 +159,80 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row) } func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) { + nullable := field.GetNullable() switch field.GetDataType() { case schemapb.DataType_Bool: + if nullable && obj == r.nullkey { + return nil, nil + } b, err := strconv.ParseBool(obj) if err != nil { return false, r.wrapTypeError(obj, field) } return b, nil case schemapb.DataType_Int8: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseInt(obj, 10, 8) if err != nil { return 0, r.wrapTypeError(obj, field) } return int8(num), nil case schemapb.DataType_Int16: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseInt(obj, 10, 16) if err != nil { return 0, r.wrapTypeError(obj, field) } return int16(num), nil case schemapb.DataType_Int32: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseInt(obj, 10, 32) if err != nil { return 0, r.wrapTypeError(obj, field) } return int32(num), nil case schemapb.DataType_Int64: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseInt(obj, 10, 64) if err != nil { return 0, r.wrapTypeError(obj, field) } return num, nil case schemapb.DataType_Float: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseFloat(obj, 32) if err != nil { return 0, r.wrapTypeError(obj, field) } return float32(num), typeutil.VerifyFloats32([]float32{float32(num)}) case schemapb.DataType_Double: + if nullable && obj == r.nullkey { + return nil, nil + } num, err := strconv.ParseFloat(obj, 64) if err != nil { return 0, r.wrapTypeError(obj, field) } return num, typeutil.VerifyFloats64([]float64{num}) case schemapb.DataType_VarChar, schemapb.DataType_String: + if nullable && obj == r.nullkey { + return nil, nil + } return obj, nil case schemapb.DataType_BinaryVector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } var vec []byte err := json.Unmarshal([]byte(obj), &vec) if err != nil { @@ -213,6 +243,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return vec, nil case schemapb.DataType_JSON: + if nullable && obj == r.nullkey { + return nil, nil + } var data interface{} err := json.Unmarshal([]byte(obj), &data) if err != nil { @@ -220,6 +253,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return []byte(obj), nil case schemapb.DataType_FloatVector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } var vec []float32 err := json.Unmarshal([]byte(obj), &vec) if err != nil { @@ -230,6 +266,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return vec, typeutil.VerifyFloats32(vec) case schemapb.DataType_Float16Vector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } var vec []float32 err := json.Unmarshal([]byte(obj), &vec) if err != nil { @@ -244,6 +283,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return vec2, typeutil.VerifyFloats16(vec2) case schemapb.DataType_BFloat16Vector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } var vec []float32 err := json.Unmarshal([]byte(obj), &vec) if err != nil { @@ -258,6 +300,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return vec2, typeutil.VerifyBFloats16(vec2) case schemapb.DataType_SparseFloatVector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } // use dec.UseNumber() to avoid float64 precision loss var vec map[string]interface{} dec := json.NewDecoder(strings.NewReader(obj)) @@ -272,6 +317,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e } return vec2, nil case schemapb.DataType_Array: + if nullable && obj == r.nullkey { + return nil, nil + } var vec []interface{} desc := json.NewDecoder(strings.NewReader(obj)) desc.UseNumber() @@ -279,6 +327,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e if err != nil { return nil, r.wrapTypeError(obj, field) } + // elements in array not support null value scalarFieldData, err := r.arrayToFieldData(vec, field.GetElementType()) if err != nil { return nil, err diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index 3c74fc195fc6f..79795831e37f2 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -51,9 +51,11 @@ func TestNewRowParser_Invalid(t *testing.T) { {header: []string{"id", "vector", "$meta"}, expectErr: "value of field is missed: 'str'"}, } + nullkey := "" + for i, c := range cases { t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) { - _, err := NewRowParser(schema, c.header) + _, err := NewRowParser(schema, c.header, nullkey) assert.Error(t, err) assert.True(t, strings.Contains(err.Error(), c.expectErr)) }) @@ -98,8 +100,10 @@ func TestRowParser_Parse_Valid(t *testing.T) { {header: []string{"id", "vector", "str", "$meta"}, row: []string{"1", "[1, 2]", "xxsddsffwq", "{\"y\": 2}"}, dyFields: map[string]any{"y": 2.0, "str": "xxsddsffwq"}}, } + nullkey := "" + for i, c := range cases { - r, err := NewRowParser(schema, c.header) + r, err := NewRowParser(schema, c.header, nullkey) assert.NoError(t, err) t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) { data, err := r.Parse(c.row) @@ -161,8 +165,10 @@ func TestRowParser_Parse_Invalid(t *testing.T) { {header: []string{"id", "vector", "x", "$meta"}, row: []string{"1", "[1, 2]", "8"}, expectErr: "the number of fields in the row is not equal to the header"}, } + nullkey := "" + for i, c := range cases { - r, err := NewRowParser(schema, c.header) + r, err := NewRowParser(schema, c.header, nullkey) assert.NoError(t, err) t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) { _, err := r.Parse(c.row) @@ -171,3 +177,58 @@ func TestRowParser_Parse_Invalid(t *testing.T) { }) } } + +func TestRowParser_Parse_NULL(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 1, + Name: "id", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 2, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + }, + { + FieldID: 3, + Name: "str", + DataType: schemapb.DataType_String, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "128", + }, + }, + Nullable: true, + }, + }, + } + + header := []string{"id", "vector", "str"} + + type testCase struct { + nullkey string + row []string + nulldata interface{} + } + + cases := []testCase{ + {nullkey: "", row: []string{"1", "[1, 2]", ""}, nulldata: nil}, + {nullkey: "NULL", row: []string{"1", "[1, 2]", "NULL"}, nulldata: nil}, + {nullkey: "\\N", row: []string{"1", "[1, 2]", "\\N"}, nulldata: nil}, + } + + for i, c := range cases { + r, err := NewRowParser(schema, header, c.nullkey) + assert.NoError(t, err) + t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) { + data, err := r.Parse(c.row) + assert.NoError(t, err) + assert.EqualValues(t, c.nulldata, data[3]) + }) + } +} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go index 55fef37794437..acf12e57c16cc 100644 --- a/internal/util/importutilv2/option.go +++ b/internal/util/importutilv2/option.go @@ -113,3 +113,12 @@ func GetCSVSep(options Options) (rune, error) { } return []rune(sep)[0], nil } + +func GetCSVNullKey(options Options) (string, error) { + nullKey, err := funcutil.GetAttrByKeyFromRepeatedKV("nullkey", options) + defaultNullKey := "" + if err != nil || len(nullKey) == 0 { + return defaultNullKey, nil + } + return nullKey, nil +} diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go index 443ed3c11d224..971158a833a35 100644 --- a/internal/util/importutilv2/reader.go +++ b/internal/util/importutilv2/reader.go @@ -76,7 +76,11 @@ func NewReader(ctx context.Context, if err != nil { return nil, err } - return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep) + nullkey, err := GetCSVNullKey(options) + if err != nil { + return nil, err + } + return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep, nullkey) } return nil, merr.WrapErrImportFailed("unexpected import file") } diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 7a20031522c08..7b827bd6f5ae9 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -571,7 +571,7 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData * return rows, nil } -func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([][]string, error) { +func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) { rowNum := insertData.GetRowNum() csvData := make([][]string, 0, rowNum+1) @@ -595,6 +595,11 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora if field.GetAutoID() { continue } + // deal with null value + if field.GetNullable() && value.GetRow(i) == nil { + data = append(data, nullkey) + continue + } switch dataType { case schemapb.DataType_Array: var arr any diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index 58eb84ab56523..d8add9e57d445 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -207,10 +207,12 @@ func GenerateCSVFile(t *testing.T, filePath string, schema *schemapb.CollectionS insertData, err := testutil.CreateInsertData(schema, count) assert.NoError(t, err) - csvData, err := testutil.CreateInsertDataForCSV(schema, insertData) + sep := ',' + nullkey := "" + + csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey) assert.NoError(t, err) - sep := ',' wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) assert.NoError(t, err)