Skip to content

Commit

Permalink
enhance: Bulkinsert supports null in csv formats (milvus-io#35912)
Browse files Browse the repository at this point in the history
see details in this issue
milvus-io#35911

---------

Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu authored and chyezh committed Sep 11, 2024
1 parent 7e39fba commit 5d8a979
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 45 deletions.
4 changes: 2 additions & 2 deletions internal/util/importutilv2/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type reader struct {
filePath string
}

func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune) (*reader, error) {
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune, nullkey string) (*reader, error) {
cmReader, err := cm.Reader(ctx, path)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error()))
Expand All @@ -53,7 +53,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read csv header, error: %v", err))
}

rowParser, err := NewRowParser(schema, header)
rowParser, err := NewRowParser(schema, header, nullkey)
if err != nil {
return nil, err
}
Expand Down
95 changes: 60 additions & 35 deletions internal/util/importutilv2/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (suite *ReaderSuite) SetupTest() {
suite.vecDataType = schemapb.DataType_FloatVector
}

func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) {
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Expand Down Expand Up @@ -74,25 +73,31 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Value: "128",
},
},
Nullable: nullable,
},
},
}

// config
// csv separator
sep := ','
// csv writer write null value as empty string
nullkey := ""

// generate csv data
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
suite.NoError(err)

// write to csv file
sep := '\t'
filePath := fmt.Sprintf("/tmp/test_%d_reader.csv", rand.Int())
defer os.Remove(filePath)
// defer os.Remove(filePath)
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(suite.T(), err)
suite.NoError(err)
writer := csv.NewWriter(wf)
writer.Comma = sep
writer.WriteAll(csvData)
err = writer.WriteAll(csvData)
suite.NoError(err)

// read from csv file
Expand All @@ -102,13 +107,13 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
suite.NoError(err)

// check reader separate fields by '\t'
wrongSep := ','
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep)
wrongSep := '\t'
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep, nullkey)
suite.Error(err)
suite.Contains(err.Error(), "value of field is missed: ")

// check data
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep, nullkey)
suite.NoError(err)

checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
Expand All @@ -129,43 +134,63 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
}

func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool, schemapb.DataType_None)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None)
suite.run(schemapb.DataType_Float, schemapb.DataType_None)
suite.run(schemapb.DataType_Double, schemapb.DataType_None)
suite.run(schemapb.DataType_String, schemapb.DataType_None)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double)
suite.run(schemapb.DataType_Array, schemapb.DataType_String)
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, false)
suite.run(schemapb.DataType_String, schemapb.DataType_None, false)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, false)

suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, true)
suite.run(schemapb.DataType_String, schemapb.DataType_None, true)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, true)
}

func (suite *ReaderSuite) TestStringPK() {
suite.pkDataType = schemapb.DataType_VarChar
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func (suite *ReaderSuite) TestVector() {
suite.vecDataType = schemapb.DataType_BinaryVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_FloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_Float16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func TestUtil(t *testing.T) {
Expand Down
51 changes: 50 additions & 1 deletion internal/util/importutilv2/csv/row_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ type RowParser interface {
Parse(raw []string) (Row, error)
}
type rowParser struct {
nullkey string
header []string
name2Dim map[string]int
name2Field map[string]*schemapb.FieldSchema
pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema
}

func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser, error) {
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
name2Field := lo.KeyBy(schema.GetFields(),
func(field *schemapb.FieldSchema) string {
return field.GetName()
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser
}

return &rowParser{
nullkey: nullkey,
name2Dim: name2Dim,
header: header,
name2Field: name2Field,
Expand Down Expand Up @@ -157,52 +159,80 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
}

func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) {
nullable := field.GetNullable()
switch field.GetDataType() {
case schemapb.DataType_Bool:
if nullable && obj == r.nullkey {
return nil, nil
}
b, err := strconv.ParseBool(obj)
if err != nil {
return false, r.wrapTypeError(obj, field)
}
return b, nil
case schemapb.DataType_Int8:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 8)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int8(num), nil
case schemapb.DataType_Int16:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 16)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int16(num), nil
case schemapb.DataType_Int32:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int32(num), nil
case schemapb.DataType_Int64:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, nil
case schemapb.DataType_Float:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return float32(num), typeutil.VerifyFloats32([]float32{float32(num)})
case schemapb.DataType_Double:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, typeutil.VerifyFloats64([]float64{num})
case schemapb.DataType_VarChar, schemapb.DataType_String:
if nullable && obj == r.nullkey {
return nil, nil
}
return obj, nil
case schemapb.DataType_BinaryVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []byte
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -213,13 +243,19 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, nil
case schemapb.DataType_JSON:
if nullable && obj == r.nullkey {
return nil, nil
}
var data interface{}
err := json.Unmarshal([]byte(obj), &data)
if err != nil {
return nil, err
}
return []byte(obj), nil
case schemapb.DataType_FloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -230,6 +266,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, typeutil.VerifyFloats32(vec)
case schemapb.DataType_Float16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -244,6 +283,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyFloats16(vec2)
case schemapb.DataType_BFloat16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -258,6 +300,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyBFloats16(vec2)
case schemapb.DataType_SparseFloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
// use dec.UseNumber() to avoid float64 precision loss
var vec map[string]interface{}
dec := json.NewDecoder(strings.NewReader(obj))
Expand All @@ -272,13 +317,17 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, nil
case schemapb.DataType_Array:
if nullable && obj == r.nullkey {
return nil, nil
}
var vec []interface{}
desc := json.NewDecoder(strings.NewReader(obj))
desc.UseNumber()
err := desc.Decode(&vec)
if err != nil {
return nil, r.wrapTypeError(obj, field)
}
// elements in array not support null value
scalarFieldData, err := r.arrayToFieldData(vec, field.GetElementType())
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 5d8a979

Please sign in to comment.