From d59f20930255cb31d904781f6cabf8ea57918bcf Mon Sep 17 00:00:00 2001 From: Louis Date: Tue, 28 Mar 2023 22:05:47 -0700 Subject: [PATCH] mapping: support endianness (#130) --- cmd/goflow2/mapping.yaml | 3 ++ producer/producer_nf.go | 104 ++++++++++++++++++++++++++++++--------- producer/producer_sf.go | 8 +-- producer/reflect.go | 46 +++++++++++++---- 4 files changed, 126 insertions(+), 35 deletions(-) diff --git a/cmd/goflow2/mapping.yaml b/cmd/goflow2/mapping.yaml index 84ab5d4..bc0fb58 100644 --- a/cmd/goflow2/mapping.yaml +++ b/cmd/goflow2/mapping.yaml @@ -16,6 +16,9 @@ netflowv9: destination: CustomInteger1 - field: 11 destination: CustomInteger2 + - field: 34 # samplingInterval + destination: SamplingRate + endian: little sflow: mapping: - layer: 4 # Layer 4: TCP or UDP diff --git a/producer/producer_nf.go b/producer/producer_nf.go index bd05c50..07a00f3 100644 --- a/producer/producer_nf.go +++ b/producer/producer_nf.go @@ -97,6 +97,38 @@ func NetFlowPopulate(dataFields []netflow.DataField, typeId uint16, addr interfa return exists } +func WriteUDecoded(o uint64, out interface{}) error { + switch t := out.(type) { + case *byte: + *t = byte(o) + case *uint16: + *t = uint16(o) + case *uint32: + *t = uint32(o) + case *uint64: + *t = o + default: + return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure") + } + return nil +} + +func WriteDecoded(o int64, out interface{}) error { + switch t := out.(type) { + case *int8: + *t = int8(o) + case *int16: + *t = int16(o) + case *int32: + *t = int32(o) + case *int64: + *t = o + default: + return errors.New("The parameter is not a pointer to a int8/int16/int32/int64 structure") + } + return nil +} + func DecodeUNumber(b []byte, out interface{}) error { var o uint64 l := len(b) @@ -120,19 +152,33 @@ func DecodeUNumber(b []byte, out interface{}) error { return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l)) } } - switch t := out.(type) { - case *byte: - *t = byte(o) - case *uint16: - *t = uint16(o) - case *uint32: - *t = uint32(o) - case *uint64: - *t = o + return WriteUDecoded(o, out) +} + +func DecodeUNumberLE(b []byte, out interface{}) error { + var o uint64 + l := len(b) + switch l { + case 1: + o = uint64(b[0]) + case 2: + o = uint64(binary.LittleEndian.Uint16(b)) + case 4: + o = uint64(binary.LittleEndian.Uint32(b)) + case 8: + o = binary.LittleEndian.Uint64(b) default: - return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure") + if l < 8 { + var iter uint + for i := range b { + o |= uint64(b[i]) << uint(8*(iter)) + iter++ + } + } else { + return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l)) + } } - return nil + return WriteUDecoded(o, out) } func DecodeNumber(b []byte, out interface{}) error { @@ -158,19 +204,33 @@ func DecodeNumber(b []byte, out interface{}) error { return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l)) } } - switch t := out.(type) { - case *int8: - *t = int8(o) - case *int16: - *t = int16(o) - case *int32: - *t = int32(o) - case *int64: - *t = o + return WriteDecoded(o, out) +} + +func DecodeNumberLE(b []byte, out interface{}) error { + var o int64 + l := len(b) + switch l { + case 1: + o = int64(int8(b[0])) + case 2: + o = int64(int16(binary.LittleEndian.Uint16(b))) + case 4: + o = int64(int32(binary.LittleEndian.Uint32(b))) + case 8: + o = int64(binary.LittleEndian.Uint64(b)) default: - return errors.New("The parameter is not a pointer to a int8/int16/int32/int64 structure") + if l < 8 { + var iter int + for i := range b { + o |= int64(b[i]) << int(8*(iter)) + iter++ + } + } else { + return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l)) + } } - return nil + return WriteDecoded(o, out) } func allZeroes(v []byte) bool { diff --git a/producer/producer_sf.go b/producer/producer_sf.go index d1ed1f6..5ff5a87 100644 --- a/producer/producer_sf.go +++ b/producer/producer_sf.go @@ -58,7 +58,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf for _, configLayer := range GetSFlowConfigLayer(config, 0) { extracted := GetBytes(data, configLayer.Offset, configLayer.Length) - MapCustom(flowMessage, extracted, configLayer.Destination) + MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian) } etherType := data[12:14] @@ -121,7 +121,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf for _, configLayer := range GetSFlowConfigLayer(config, 3) { extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length) - MapCustom(flowMessage, extracted, configLayer.Destination) + MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian) } if etherType[0] == 0x8 && etherType[1] == 0x0 { // IPv4 @@ -159,7 +159,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf for _, configLayer := range GetSFlowConfigLayer(config, 4) { extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length) - MapCustom(flowMessage, extracted, configLayer.Destination) + MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian) } appOffset := 0 @@ -187,7 +187,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf if appOffset > 0 { for _, configLayer := range GetSFlowConfigLayer(config, 7) { extracted := GetBytes(data, (offset+appOffset)*8+configLayer.Offset, configLayer.Length) - MapCustom(flowMessage, extracted, configLayer.Destination) + MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian) } } diff --git a/producer/reflect.go b/producer/reflect.go index 086a3d0..91a2a41 100644 --- a/producer/reflect.go +++ b/producer/reflect.go @@ -8,6 +8,13 @@ import ( flowmessage "github.com/netsampler/goflow2/pb" ) +type EndianType string + +var ( + BigEndian EndianType = "big" + LittleEndian EndianType = "little" +) + func GetBytes(d []byte, offset int, length int) []byte { if length == 0 { return nil @@ -56,11 +63,11 @@ func MapCustomNetFlow(flowMessage *flowmessage.FlowMessage, df netflow.DataField mapped, ok := mapper.Map(df) if ok { v := df.Value.([]byte) - MapCustom(flowMessage, v, mapped.Destination) + MapCustom(flowMessage, v, mapped.Destination, mapped.Endian) } } -func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination string) { +func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination string, endianness EndianType) { vfm := reflect.ValueOf(flowMessage) vfm = reflect.Indirect(vfm) @@ -78,9 +85,17 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin item := reflect.New(typeDest.Elem()) if IsUInt(typeDest.Elem().Kind()) { - DecodeUNumber(v, item.Interface()) + if endianness == LittleEndian { + DecodeUNumberLE(v, item.Interface()) + } else { + DecodeUNumber(v, item.Interface()) + } } else if IsUInt(typeDest.Elem().Kind()) { - DecodeUNumber(v, item.Interface()) + if endianness == LittleEndian { + DecodeUNumberLE(v, item.Interface()) + } else { + DecodeUNumber(v, item.Interface()) + } } itemi := reflect.Indirect(item) @@ -89,9 +104,17 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin } } else if fieldValueAddr.IsValid() && IsUInt(typeDest.Kind()) { - DecodeUNumber(v, fieldValueAddr.Interface()) + if endianness == LittleEndian { + DecodeUNumberLE(v, fieldValueAddr.Interface()) + } else { + DecodeUNumber(v, fieldValueAddr.Interface()) + } } else if fieldValueAddr.IsValid() && IsInt(typeDest.Kind()) { - DecodeNumber(v, fieldValueAddr.Interface()) + if endianness == LittleEndian { + DecodeUNumberLE(v, fieldValueAddr.Interface()) + } else { + DecodeUNumber(v, fieldValueAddr.Interface()) + } } } } @@ -101,7 +124,8 @@ type NetFlowMapField struct { Type uint16 `json:"field" yaml:"field"` Pen uint32 `json:"pen" yaml:"pen"` - Destination string `json:"destination" yaml:"destination"` + Destination string `json:"destination" yaml:"destination"` + Endian EndianType `json:"endianness" yaml:"endianness"` //DestinationLength uint8 `json:"dlen"` // could be used if populating a slice of uint16 that aren't in protobuf } @@ -119,7 +143,8 @@ type SFlowMapField struct { Offset int `json:"offset"` // offset in bits Length int `json:"length"` // length in bits - Destination string `json:"destination"` + Destination string `json:"destination" yaml:"destination"` + Endian EndianType `json:"endianness" yaml:"endianness"` //DestinationLength uint8 `json:"dlen"` } @@ -137,6 +162,7 @@ type ProducerConfig struct { type DataMap struct { Destination string + Endian EndianType } type NetFlowMapper struct { @@ -151,7 +177,7 @@ func (m *NetFlowMapper) Map(field netflow.DataField) (DataMap, bool) { func MapFieldsNetFlow(fields []NetFlowMapField) *NetFlowMapper { ret := make(map[string]DataMap) for _, field := range fields { - ret[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] = DataMap{Destination: field.Destination} + ret[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] = DataMap{Destination: field.Destination, Endian: field.Endian} } return &NetFlowMapper{ret} } @@ -160,6 +186,7 @@ type DataMapLayer struct { Offset int Length int Destination string + Endian EndianType } type SFlowMapper struct { @@ -180,6 +207,7 @@ func MapFieldsSFlow(fields []SFlowMapField) *SFlowMapper { Offset: field.Offset, Length: field.Length, Destination: field.Destination, + Endian: field.Endian, } retLayer := ret[field.Layer] retLayer = append(retLayer, retLayerEntry)