Skip to content

Commit

Permalink
mapping: support endianness (cloudflare#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn authored Mar 29, 2023
1 parent 40e5ef0 commit d59f209
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 35 deletions.
3 changes: 3 additions & 0 deletions cmd/goflow2/mapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ netflowv9:
destination: CustomInteger1
- field: 11
destination: CustomInteger2
- field: 34 # samplingInterval
destination: SamplingRate
endian: little
sflow:
mapping:
- layer: 4 # Layer 4: TCP or UDP
Expand Down
104 changes: 82 additions & 22 deletions producer/producer_nf.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,38 @@ func NetFlowPopulate(dataFields []netflow.DataField, typeId uint16, addr interfa
return exists
}

func WriteUDecoded(o uint64, out interface{}) error {
switch t := out.(type) {
case *byte:
*t = byte(o)
case *uint16:
*t = uint16(o)
case *uint32:
*t = uint32(o)
case *uint64:
*t = o
default:
return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure")
}
return nil
}

func WriteDecoded(o int64, out interface{}) error {
switch t := out.(type) {
case *int8:
*t = int8(o)
case *int16:
*t = int16(o)
case *int32:
*t = int32(o)
case *int64:
*t = o
default:
return errors.New("The parameter is not a pointer to a int8/int16/int32/int64 structure")
}
return nil
}

func DecodeUNumber(b []byte, out interface{}) error {
var o uint64
l := len(b)
Expand All @@ -120,19 +152,33 @@ func DecodeUNumber(b []byte, out interface{}) error {
return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l))
}
}
switch t := out.(type) {
case *byte:
*t = byte(o)
case *uint16:
*t = uint16(o)
case *uint32:
*t = uint32(o)
case *uint64:
*t = o
return WriteUDecoded(o, out)
}

func DecodeUNumberLE(b []byte, out interface{}) error {
var o uint64
l := len(b)
switch l {
case 1:
o = uint64(b[0])
case 2:
o = uint64(binary.LittleEndian.Uint16(b))
case 4:
o = uint64(binary.LittleEndian.Uint32(b))
case 8:
o = binary.LittleEndian.Uint64(b)
default:
return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure")
if l < 8 {
var iter uint
for i := range b {
o |= uint64(b[i]) << uint(8*(iter))
iter++
}
} else {
return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l))
}
}
return nil
return WriteUDecoded(o, out)
}

func DecodeNumber(b []byte, out interface{}) error {
Expand All @@ -158,19 +204,33 @@ func DecodeNumber(b []byte, out interface{}) error {
return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l))
}
}
switch t := out.(type) {
case *int8:
*t = int8(o)
case *int16:
*t = int16(o)
case *int32:
*t = int32(o)
case *int64:
*t = o
return WriteDecoded(o, out)
}

func DecodeNumberLE(b []byte, out interface{}) error {
var o int64
l := len(b)
switch l {
case 1:
o = int64(int8(b[0]))
case 2:
o = int64(int16(binary.LittleEndian.Uint16(b)))
case 4:
o = int64(int32(binary.LittleEndian.Uint32(b)))
case 8:
o = int64(binary.LittleEndian.Uint64(b))
default:
return errors.New("The parameter is not a pointer to a int8/int16/int32/int64 structure")
if l < 8 {
var iter int
for i := range b {
o |= int64(b[i]) << int(8*(iter))
iter++
}
} else {
return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l))
}
}
return nil
return WriteDecoded(o, out)
}

func allZeroes(v []byte) bool {
Expand Down
8 changes: 4 additions & 4 deletions producer/producer_sf.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf

for _, configLayer := range GetSFlowConfigLayer(config, 0) {
extracted := GetBytes(data, configLayer.Offset, configLayer.Length)
MapCustom(flowMessage, extracted, configLayer.Destination)
MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian)
}

etherType := data[12:14]
Expand Down Expand Up @@ -121,7 +121,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf

for _, configLayer := range GetSFlowConfigLayer(config, 3) {
extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length)
MapCustom(flowMessage, extracted, configLayer.Destination)
MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian)
}

if etherType[0] == 0x8 && etherType[1] == 0x0 { // IPv4
Expand Down Expand Up @@ -159,7 +159,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf

for _, configLayer := range GetSFlowConfigLayer(config, 4) {
extracted := GetBytes(data, offset*8+configLayer.Offset, configLayer.Length)
MapCustom(flowMessage, extracted, configLayer.Destination)
MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian)
}

appOffset := 0
Expand Down Expand Up @@ -187,7 +187,7 @@ func ParseEthernetHeader(flowMessage *flowmessage.FlowMessage, data []byte, conf
if appOffset > 0 {
for _, configLayer := range GetSFlowConfigLayer(config, 7) {
extracted := GetBytes(data, (offset+appOffset)*8+configLayer.Offset, configLayer.Length)
MapCustom(flowMessage, extracted, configLayer.Destination)
MapCustom(flowMessage, extracted, configLayer.Destination, configLayer.Endian)
}
}

Expand Down
46 changes: 37 additions & 9 deletions producer/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
flowmessage "github.com/netsampler/goflow2/pb"
)

type EndianType string

var (
BigEndian EndianType = "big"
LittleEndian EndianType = "little"
)

func GetBytes(d []byte, offset int, length int) []byte {
if length == 0 {
return nil
Expand Down Expand Up @@ -56,11 +63,11 @@ func MapCustomNetFlow(flowMessage *flowmessage.FlowMessage, df netflow.DataField
mapped, ok := mapper.Map(df)
if ok {
v := df.Value.([]byte)
MapCustom(flowMessage, v, mapped.Destination)
MapCustom(flowMessage, v, mapped.Destination, mapped.Endian)
}
}

func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination string) {
func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination string, endianness EndianType) {
vfm := reflect.ValueOf(flowMessage)
vfm = reflect.Indirect(vfm)

Expand All @@ -78,9 +85,17 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin
item := reflect.New(typeDest.Elem())

if IsUInt(typeDest.Elem().Kind()) {
DecodeUNumber(v, item.Interface())
if endianness == LittleEndian {
DecodeUNumberLE(v, item.Interface())
} else {
DecodeUNumber(v, item.Interface())
}
} else if IsUInt(typeDest.Elem().Kind()) {
DecodeUNumber(v, item.Interface())
if endianness == LittleEndian {
DecodeUNumberLE(v, item.Interface())
} else {
DecodeUNumber(v, item.Interface())
}
}

itemi := reflect.Indirect(item)
Expand All @@ -89,9 +104,17 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin
}

} else if fieldValueAddr.IsValid() && IsUInt(typeDest.Kind()) {
DecodeUNumber(v, fieldValueAddr.Interface())
if endianness == LittleEndian {
DecodeUNumberLE(v, fieldValueAddr.Interface())
} else {
DecodeUNumber(v, fieldValueAddr.Interface())
}
} else if fieldValueAddr.IsValid() && IsInt(typeDest.Kind()) {
DecodeNumber(v, fieldValueAddr.Interface())
if endianness == LittleEndian {
DecodeUNumberLE(v, fieldValueAddr.Interface())
} else {
DecodeUNumber(v, fieldValueAddr.Interface())
}
}
}
}
Expand All @@ -101,7 +124,8 @@ type NetFlowMapField struct {
Type uint16 `json:"field" yaml:"field"`
Pen uint32 `json:"pen" yaml:"pen"`

Destination string `json:"destination" yaml:"destination"`
Destination string `json:"destination" yaml:"destination"`
Endian EndianType `json:"endianness" yaml:"endianness"`
//DestinationLength uint8 `json:"dlen"` // could be used if populating a slice of uint16 that aren't in protobuf
}

Expand All @@ -119,7 +143,8 @@ type SFlowMapField struct {
Offset int `json:"offset"` // offset in bits
Length int `json:"length"` // length in bits

Destination string `json:"destination"`
Destination string `json:"destination" yaml:"destination"`
Endian EndianType `json:"endianness" yaml:"endianness"`
//DestinationLength uint8 `json:"dlen"`
}

Expand All @@ -137,6 +162,7 @@ type ProducerConfig struct {

type DataMap struct {
Destination string
Endian EndianType
}

type NetFlowMapper struct {
Expand All @@ -151,7 +177,7 @@ func (m *NetFlowMapper) Map(field netflow.DataField) (DataMap, bool) {
func MapFieldsNetFlow(fields []NetFlowMapField) *NetFlowMapper {
ret := make(map[string]DataMap)
for _, field := range fields {
ret[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] = DataMap{Destination: field.Destination}
ret[fmt.Sprintf("%v-%d-%d", field.PenProvided, field.Pen, field.Type)] = DataMap{Destination: field.Destination, Endian: field.Endian}
}
return &NetFlowMapper{ret}
}
Expand All @@ -160,6 +186,7 @@ type DataMapLayer struct {
Offset int
Length int
Destination string
Endian EndianType
}

type SFlowMapper struct {
Expand All @@ -180,6 +207,7 @@ func MapFieldsSFlow(fields []SFlowMapField) *SFlowMapper {
Offset: field.Offset,
Length: field.Length,
Destination: field.Destination,
Endian: field.Endian,
}
retLayer := ret[field.Layer]
retLayer = append(retLayer, retLayerEntry)
Expand Down

0 comments on commit d59f209

Please sign in to comment.