Skip to content

Commit

Permalink
feat: add unit & integration test
Browse files Browse the repository at this point in the history
Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu committed Sep 2, 2024
1 parent 1803613 commit 387f93f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 41 deletions.
95 changes: 60 additions & 35 deletions internal/util/importutilv2/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (suite *ReaderSuite) SetupTest() {
suite.vecDataType = schemapb.DataType_FloatVector
}

func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) {
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Expand Down Expand Up @@ -74,25 +73,31 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Value: "128",
},
},
Nullable: nullable,
},
},
}

// config
// csv separator
sep := ','
// csv writer write null value as empty string
nullkey := ""

// generate csv data
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
suite.NoError(err)

// write to csv file
sep := '\t'
filePath := fmt.Sprintf("/tmp/test_%d_reader.csv", rand.Int())
defer os.Remove(filePath)
// defer os.Remove(filePath)
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(suite.T(), err)
suite.NoError(err)
writer := csv.NewWriter(wf)
writer.Comma = sep
writer.WriteAll(csvData)
err = writer.WriteAll(csvData)
suite.NoError(err)

// read from csv file
Expand All @@ -102,13 +107,13 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
suite.NoError(err)

// check reader separate fields by '\t'
wrongSep := ','
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep)
wrongSep := '\t'
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep, nullkey)
suite.Error(err)
suite.Contains(err.Error(), "value of field is missed: ")

// check data
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep, nullkey)
suite.NoError(err)

checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
Expand All @@ -129,43 +134,63 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
}

func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool, schemapb.DataType_None)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None)
suite.run(schemapb.DataType_Float, schemapb.DataType_None)
suite.run(schemapb.DataType_Double, schemapb.DataType_None)
suite.run(schemapb.DataType_String, schemapb.DataType_None)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double)
suite.run(schemapb.DataType_Array, schemapb.DataType_String)
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, false)
suite.run(schemapb.DataType_String, schemapb.DataType_None, false)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, false)

suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, true)
suite.run(schemapb.DataType_String, schemapb.DataType_None, true)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true)

suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, true)
}

func (suite *ReaderSuite) TestStringPK() {
suite.pkDataType = schemapb.DataType_VarChar
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func (suite *ReaderSuite) TestVector() {
suite.vecDataType = schemapb.DataType_BinaryVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_FloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_Float16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}

func TestUtil(t *testing.T) {
Expand Down
67 changes: 64 additions & 3 deletions internal/util/importutilv2/csv/row_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ func TestNewRowParser_Invalid(t *testing.T) {
{header: []string{"id", "vector", "$meta"}, expectErr: "value of field is missed: 'str'"},
}

nullkey := ""

for i, c := range cases {
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
_, err := NewRowParser(schema, c.header)
_, err := NewRowParser(schema, c.header, nullkey)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), c.expectErr))
})
Expand Down Expand Up @@ -98,8 +100,10 @@ func TestRowParser_Parse_Valid(t *testing.T) {
{header: []string{"id", "vector", "str", "$meta"}, row: []string{"1", "[1, 2]", "xxsddsffwq", "{\"y\": 2}"}, dyFields: map[string]any{"y": 2.0, "str": "xxsddsffwq"}},
}

nullkey := ""

for i, c := range cases {
r, err := NewRowParser(schema, c.header)
r, err := NewRowParser(schema, c.header, nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
data, err := r.Parse(c.row)
Expand Down Expand Up @@ -161,8 +165,10 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
{header: []string{"id", "vector", "x", "$meta"}, row: []string{"1", "[1, 2]", "8"}, expectErr: "the number of fields in the row is not equal to the header"},
}

nullkey := ""

for i, c := range cases {
r, err := NewRowParser(schema, c.header)
r, err := NewRowParser(schema, c.header, nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
_, err := r.Parse(c.row)
Expand All @@ -171,3 +177,58 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
})
}
}

func TestRowParser_Parse_NULL(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
Name: "id",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 2,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
},
{
FieldID: 3,
Name: "str",
DataType: schemapb.DataType_String,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "128",
},
},
Nullable: true,
},
},
}

header := []string{"id", "vector", "str"}

type testCase struct {
nullkey string
row []string
nulldata interface{}
}

cases := []testCase{
{nullkey: "", row: []string{"1", "[1, 2]", ""}, nulldata: nil},
{nullkey: "NULL", row: []string{"1", "[1, 2]", "NULL"}, nulldata: nil},
{nullkey: "\\N", row: []string{"1", "[1, 2]", "\\N"}, nulldata: nil},
}

for i, c := range cases {
r, err := NewRowParser(schema, header, c.nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
data, err := r.Parse(c.row)
assert.NoError(t, err)
assert.EqualValues(t, c.nulldata, data[3])
})
}
}
7 changes: 6 additions & 1 deletion internal/util/testutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *
return rows, nil
}

func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([][]string, error) {
func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) {
rowNum := insertData.GetRowNum()
csvData := make([][]string, 0, rowNum+1)

Expand All @@ -595,6 +595,11 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora
if field.GetAutoID() {
continue
}
// deal with null value
if field.GetNullable() && value.GetRow(i) == nil {
data = append(data, nullkey)
continue
}
switch dataType {
case schemapb.DataType_Array:
var arr any
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/import/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,12 @@ func GenerateCSVFile(t *testing.T, filePath string, schema *schemapb.CollectionS
insertData, err := testutil.CreateInsertData(schema, count)
assert.NoError(t, err)

csvData, err := testutil.CreateInsertDataForCSV(schema, insertData)
sep := ','
nullkey := ""

csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
assert.NoError(t, err)

sep := ','
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(t, err)

Expand Down

0 comments on commit 387f93f

Please sign in to comment.