Skip to content

Commit

Permalink
Merge pull request #147 from warpstreamlabs/feat/int-types
Browse files Browse the repository at this point in the history
Add support for int8/16 to parquet encode
  • Loading branch information
richardartoul authored Oct 30, 2024
2 parents ea32949 + 94e2dfc commit 7b10c77
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 34 deletions.
149 changes: 134 additions & 15 deletions internal/impl/parquet/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,120 @@ func setField(field reflect.Value, value any) error {
// For interface{} fields, we can directly set the value
field.Set(reflect.ValueOf(value))
return nil
case reflect.Int8:
switch v := value.(type) {
case int:
if int(int8(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
case int8:
field.SetInt(int64(v))
return nil
case int16:
if int16(int8(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
case int32:
if int32(int8(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
case int64:
if int64(int8(v)) == v {
field.SetInt(v)
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
case float32:
if v >= math.MinInt8 && v <= math.MaxInt8 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
case float64:
if v >= math.MinInt8 && v <= math.MaxInt8 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int8", value)
default:
return fmt.Errorf("cannot convert %T to int8", value)
}
case reflect.Int16:
switch v := value.(type) {
case int:
if int(int16(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int16", value)
case int8:
field.SetInt(int64(v))
case int16:
field.SetInt(int64(v))
case int32:
if int32(int16(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int16", value)
case int64:
if int64(int16(v)) == v {
field.SetInt(v)
return nil
}
return fmt.Errorf("cannot represent %v as int16", value)
case float32:
if v >= math.MinInt16 && v <= math.MaxInt16 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int16", value)
case float64:
if v >= math.MinInt16 && v <= math.MaxInt16 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int16", value)
default:
return fmt.Errorf("cannot convert %T to int16", value)
}
case reflect.Int32:
switch v := value.(type) {
case int:
if int(int32(v)) == v {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int32", value)
case int8:
field.SetInt(int64(v))
case int16:
field.SetInt(int64(v))
case int32:
field.SetInt(int64(v))
case int64:
if int64(int32(v)) == v {
field.SetInt(v)
return nil
}
return fmt.Errorf("cannot represent %v as int32", value)
case float32:
if v >= math.MinInt32 && v <= math.MaxInt32 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int32", value)
case float64:
if v >= math.MinInt32 || v <= math.MaxInt32 {
if v >= math.MinInt32 && v <= math.MaxInt32 {
field.SetInt(int64(v))
return nil
}

return fmt.Errorf("cannot represent %v as int32", value)
default:
return fmt.Errorf("cannot convert %T to int64", value)
Expand All @@ -99,44 +197,65 @@ func setField(field reflect.Value, value any) error {
switch v := value.(type) {
case int:
field.SetInt(int64(v))
case int8:
field.SetInt(int64(v))
case int16:
field.SetInt(int64(v))
case int32:
field.SetInt(int64(v))
case int64:
field.SetInt(v)
case float32:
if v >= math.MinInt64 && v <= math.MaxInt64 {
field.SetInt(int64(v))
return nil
}
return fmt.Errorf("cannot represent %v as int64", value)
case float64:
if v >= math.MinInt64 || v <= math.MaxInt64 {
if v >= math.MinInt64 && v <= math.MaxInt64 {
field.SetInt(int64(v))
return nil
}
field.SetInt(int64(v))
return fmt.Errorf("cannot represent %v as int64", value)
default:
return fmt.Errorf("cannot convert %T to int64", value)
}

case reflect.Float64:
switch v := value.(type) {
case float32:
field.SetFloat(float64(v))
case float64:
field.SetFloat(v)
case int:
field.SetFloat(float64(v))
return nil
case int8:
field.SetFloat(float64(v))
case int16:
field.SetFloat(float64(v))
case int32:
field.SetFloat(float64(v))
case int64:
field.SetFloat(float64(v))
return nil
case float32:
field.SetFloat(float64(v))
case float64:
field.SetFloat(v)
default:
return fmt.Errorf("cannot convert %T to float64", value)
}
case reflect.Float32:
switch v := value.(type) {
case float64:
field.SetFloat(v)
return nil
case int:
field.SetFloat(float64(v))
return nil
case int8:
field.SetFloat(float64(v))
case int16:
field.SetFloat(float64(v))
case int32:
field.SetFloat(float64(v))
case int64:
field.SetFloat(float64(v))
return nil
case float32:
field.SetFloat(float64(v))
case float64:
field.SetFloat(v)
default:
return fmt.Errorf("cannot convert %T to float64", value)
}
Expand Down
26 changes: 16 additions & 10 deletions internal/impl/parquet/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ type NestedStruct struct {
}

type ComplexStruct struct {
ID string `json:"id"`
Nested NestedStruct `json:"nested"`
Numbers []int64 `json:"numbers"`
Floats []float64 `json:"floats"`
Mixed []any `json:"mixed"`
Structs []SimpleStruct `json:"structs"`
StringMap map[string]string `json:"stringMap"`
IntMap map[string]int64 `json:"intMap"`
FloatMap map[string]float64 `json:"floatMap"`
BoolMap map[string]bool `json:"boolMap"`
ID string `json:"id"`
Nested NestedStruct `json:"nested"`
Numbers []int64 `json:"numbers"`
Floats []float64 `json:"floats"`
Mixed []any `json:"mixed"`
Structs []SimpleStruct `json:"structs"`
StringMap map[string]string `json:"stringMap"`
IntMap map[string]int64 `json:"intMap"`
FloatMap map[string]float64 `json:"floatMap"`
BoolMap map[string]bool `json:"boolMap"`
TinyNumbers []int8 `json:"tinyNumbers"`
SmallNumbers []int16 `json:"smallNumbers"`
}

func TestMapToStruct(t *testing.T) {
Expand Down Expand Up @@ -127,6 +129,8 @@ func TestMapToStruct(t *testing.T) {
"true": true,
"false": false,
},
"tinyNumbers": []any{int8(-128), int8(-1), int8(0), int8(1), int8(127)},
"smallNumbers": []any{int16(-32768), int16(-1), int16(0), int16(1), int16(32767)},
},
dest: &ComplexStruct{},
want: &ComplexStruct{
Expand Down Expand Up @@ -174,6 +178,8 @@ func TestMapToStruct(t *testing.T) {
"true": true,
"false": false,
},
TinyNumbers: []int8{-128, -1, 0, 1, 127},
SmallNumbers: []int16{-32768, -1, 0, 1, 32767},
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/parquet/processor_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func init() {
func parquetSchemaConfig() *service.ConfigField {
return service.NewObjectListField("schema",
service.NewStringField("name").Description("The name of the column."),
service.NewStringEnumField("type", "BOOLEAN", "INT32", "INT64", "DECIMAL64", "DECIMAL32", "FLOAT", "DOUBLE", "BYTE_ARRAY", "UTF8", "MAP", "LIST").
service.NewStringEnumField("type", "BOOLEAN", "INT8", "INT16", "INT32", "INT64", "DECIMAL64", "DECIMAL32", "FLOAT", "DOUBLE", "BYTE_ARRAY", "UTF8", "MAP", "LIST").
Description("The type of the column, only applicable for leaf columns with no child fields. MAP supports only string keys, but can support values of all types. Nesting of map values and list elements is untested. Some logical types can be specified here such as UTF8.").Optional(),
service.NewIntField("decimal_precision").Description("Precision to use for DECIMAL32/DECIMAL64 type").Default(0),
service.NewIntField("decimal_scale").Description("Scale to use for DECIMAL32/DECIMAL64 type").Default(0),
Expand Down
7 changes: 6 additions & 1 deletion internal/impl/parquet/processor_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
// Before we changed this, the library used to mix and match behavior where sometimes it would quietly
// downscale the value, and other times it would not. Now we always just do a straight cast.
func TestParquetEncodeDoesNotPanic(t *testing.T) {
// We assume that converting integers to floats and vice versa
// that its "obvious" that there is some lossiness and that is
// acceptable.
encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(`
schema:
- { name: id, type: FLOAT }
Expand All @@ -34,6 +37,8 @@ schema:
})
require.NoError(t, err)

// But underflowing/overflowing integers is not acceptable and we return
// an error.
encodeConf, err = parquetEncodeProcessorConfig().ParseYAML(`
schema:
- { name: id, type: INT32 }
Expand All @@ -47,7 +52,7 @@ schema:
_, err = encodeProc.ProcessBatch(tctx, service.MessageBatch{
service.NewMessage([]byte(`{"id":1e10,"name":"foo"}`)),
})
require.NoError(t, err)
require.Error(t, err)
}

func TestParquetEncodeDecodeRoundTrip(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions internal/impl/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func getReflectType(typeStr string) (reflect.Type, error) {
return reflect.TypeOf(""), nil
case "BYTE_ARRAY":
return reflect.TypeOf([]byte(nil)), nil
case "INT8":
return reflect.TypeOf(int8(0)), nil
case "INT16":
return reflect.TypeOf(int16(0)), nil
case "INT32":
return reflect.TypeOf(int32(0)), nil
case "INT64":
Expand Down
21 changes: 15 additions & 6 deletions internal/impl/parquet/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestGenerateStructTypeAsPtrs(t *testing.T) {
schema:
- { name: str, type: UTF8 }
- { name: num, type: INT64 }
- { name: smallNum, type: INT16 }
- { name: tinyNum, type: INT8 }
- { name: flt, type: FLOAT }
- { name: bool, type: BOOLEAN }
- { name: dec32, type: DECIMAL32, decimal_precision: 3}
Expand All @@ -32,12 +34,14 @@ schema:
fieldType reflect.Type
tag string
}{
"Str": {reflect.TypeOf(""), `parquet:"str" json:"str"`},
"Num": {reflect.TypeOf(int64(0)), `parquet:"num" json:"num"`},
"Flt": {reflect.TypeOf(float32(0)), `parquet:"flt" json:"flt"`},
"Bool": {reflect.TypeOf(false), `parquet:"bool" json:"bool"`},
"Dec32": {reflect.TypeOf(int32(0)), `parquet:"dec32,decimal(0:3)" json:"dec32"`},
"Dec64": {reflect.TypeOf(int64(0)), `parquet:"dec64,decimal(4:10)" json:"dec64"`},
"Str": {reflect.TypeOf(""), `parquet:"str" json:"str"`},
"Num": {reflect.TypeOf(int64(0)), `parquet:"num" json:"num"`},
"SmallNum": {reflect.TypeOf(int16(0)), `parquet:"smallNum" json:"smallNum"`},
"TinyNum": {reflect.TypeOf(int8(0)), `parquet:"tinyNum" json:"tinyNum"`},
"Flt": {reflect.TypeOf(float32(0)), `parquet:"flt" json:"flt"`},
"Bool": {reflect.TypeOf(false), `parquet:"bool" json:"bool"`},
"Dec32": {reflect.TypeOf(int32(0)), `parquet:"dec32,decimal(0:3)" json:"dec32"`},
"Dec64": {reflect.TypeOf(int64(0)), `parquet:"dec64,decimal(4:10)" json:"dec64"`},
},
},
{
Expand Down Expand Up @@ -155,6 +159,7 @@ schema:
- { name: id, type: INT64 }
- { name: as, type: FLOAT, repeated: true }
- { name: g, type: INT64, optional: true }
- { name: h, type: INT16, optional: true }
- name: withchild
optional: true
fields:
Expand All @@ -181,6 +186,10 @@ schema:
fieldType: reflect.PointerTo(reflect.TypeOf(int64(0))),
tag: `parquet:"g" json:"g"`,
},
"H": {
fieldType: reflect.PointerTo(reflect.TypeOf(int16(0))),
tag: `parquet:"h" json:"h"`,
},
"Withchild": {
fieldType: reflect.PointerTo(reflect.StructOf([]reflect.StructField{
{Name: "A_stuff", Type: reflect.TypeOf(""), Tag: `parquet:"a_stuff" json:"a_stuff"`},
Expand Down
2 changes: 1 addition & 1 deletion website/docs/components/processors/parquet_encode.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ The type of the column, only applicable for leaf columns with no child fields. M


Type: `string`
Options: `BOOLEAN`, `INT32`, `INT64`, `DECIMAL64`, `DECIMAL32`, `FLOAT`, `DOUBLE`, `BYTE_ARRAY`, `UTF8`, `MAP`, `LIST`.
Options: `BOOLEAN`, `INT8`, `INT16`, `INT32`, `INT64`, `DECIMAL64`, `DECIMAL32`, `FLOAT`, `DOUBLE`, `BYTE_ARRAY`, `UTF8`, `MAP`, `LIST`.

### `schema[].decimal_precision`

Expand Down

0 comments on commit 7b10c77

Please sign in to comment.