Skip to content

Commit

Permalink
feat: csv format support null value
Browse files Browse the repository at this point in the history
Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu committed Sep 2, 2024
1 parent 3698c53 commit 1803613
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 4 deletions.
4 changes: 2 additions & 2 deletions internal/util/importutilv2/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type reader struct {
filePath string
}

func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune) (*reader, error) {
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune, nullkey string) (*reader, error) {
cmReader, err := cm.Reader(ctx, path)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error()))
Expand All @@ -53,7 +53,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read csv header, error: %v", err))
}

rowParser, err := NewRowParser(schema, header)
rowParser, err := NewRowParser(schema, header, nullkey)
if err != nil {
return nil, err
}
Expand Down
51 changes: 50 additions & 1 deletion internal/util/importutilv2/csv/row_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ type RowParser interface {
Parse(raw []string) (Row, error)
}
type rowParser struct {
nullkey string
header []string
name2Dim map[string]int
name2Field map[string]*schemapb.FieldSchema
pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema
}

func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser, error) {
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
name2Field := lo.KeyBy(schema.GetFields(),
func(field *schemapb.FieldSchema) string {
return field.GetName()
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser
}

return &rowParser{
nullkey: nullkey,
name2Dim: name2Dim,
header: header,
name2Field: name2Field,
Expand Down Expand Up @@ -157,52 +159,80 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
}

func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) {
nullable := field.GetNullable()
switch field.GetDataType() {
case schemapb.DataType_Bool:
if nullable && obj == r.nullkey {
return nil, nil
}
b, err := strconv.ParseBool(obj)
if err != nil {
return false, r.wrapTypeError(obj, field)
}
return b, nil
case schemapb.DataType_Int8:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 8)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int8(num), nil
case schemapb.DataType_Int16:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 16)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int16(num), nil
case schemapb.DataType_Int32:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int32(num), nil
case schemapb.DataType_Int64:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, nil
case schemapb.DataType_Float:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return float32(num), typeutil.VerifyFloats32([]float32{float32(num)})
case schemapb.DataType_Double:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, typeutil.VerifyFloats64([]float64{num})
case schemapb.DataType_VarChar, schemapb.DataType_String:
if nullable && obj == r.nullkey {
return nil, nil
}
return obj, nil
case schemapb.DataType_BinaryVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []byte
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -213,13 +243,19 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, nil
case schemapb.DataType_JSON:
if nullable && obj == r.nullkey {
return nil, nil
}
var data interface{}
err := json.Unmarshal([]byte(obj), &data)
if err != nil {
return nil, err
}
return []byte(obj), nil
case schemapb.DataType_FloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -230,6 +266,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, typeutil.VerifyFloats32(vec)
case schemapb.DataType_Float16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -244,6 +283,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyFloats16(vec2)
case schemapb.DataType_BFloat16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
Expand All @@ -258,6 +300,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyBFloats16(vec2)
case schemapb.DataType_SparseFloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
// use dec.UseNumber() to avoid float64 precision loss
var vec map[string]interface{}
dec := json.NewDecoder(strings.NewReader(obj))
Expand All @@ -272,13 +317,17 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, nil
case schemapb.DataType_Array:
if nullable && obj == r.nullkey {
return nil, nil
}
var vec []interface{}
desc := json.NewDecoder(strings.NewReader(obj))
desc.UseNumber()
err := desc.Decode(&vec)
if err != nil {
return nil, r.wrapTypeError(obj, field)
}
// elements in array not support null value
scalarFieldData, err := r.arrayToFieldData(vec, field.GetElementType())
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions internal/util/importutilv2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,12 @@ func GetCSVSep(options Options) (rune, error) {
}
return []rune(sep)[0], nil
}

func GetCSVNullKey(options Options) (string, error) {
nullKey, err := funcutil.GetAttrByKeyFromRepeatedKV("nullkey", options)
defaultNullKey := ""
if err != nil || len(nullKey) == 0 {
return defaultNullKey, nil
}
return nullKey, nil
}
6 changes: 5 additions & 1 deletion internal/util/importutilv2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func NewReader(ctx context.Context,
if err != nil {
return nil, err
}
return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep)
nullkey, err := GetCSVNullKey(options)
if err != nil {
return nil, err
}
return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep, nullkey)
}
return nil, merr.WrapErrImportFailed("unexpected import file")
}

0 comments on commit 1803613

Please sign in to comment.