Skip to content

Commit

Permalink
producer, utils: refactor old funcs for new FlowSets field type
Browse files Browse the repository at this point in the history
Signed-off-by: Batyrkhan Koshenov <[email protected]>
  • Loading branch information
Batyrkhan Koshenov committed Oct 31, 2022
1 parent 8019769 commit b4c1470
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 202 deletions.
50 changes: 7 additions & 43 deletions producer/producer_nf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func CreateSamplingSystem() SamplingRateSystem {
func (s *basicSamplingRateSystem) AddSamplingRate(version uint16, obsDomainId uint32, samplingRate uint32) {
s.samplinglock.Lock()
_, exists := s.sampling[version]
if exists != true {
if !exists {
s.sampling[version] = make(map[uint32]uint32)
}
s.sampling[version][obsDomainId] = samplingRate
Expand Down Expand Up @@ -119,7 +119,7 @@ func DecodeUNumber(b []byte, out interface{}) error {
iter++
}
} else {
return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l))
return fmt.Errorf("non-regular number of bytes for a number: %v", l)
}
}
switch t := out.(type) {
Expand All @@ -132,7 +132,7 @@ func DecodeUNumber(b []byte, out interface{}) error {
case *uint64:
*t = o
default:
return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure")
return errors.New("the parameter is not a pointer to a byte/uint16/uint32/uint64 structure")
}
return nil
}
Expand All @@ -150,13 +150,9 @@ func ConvertNetFlowDataSet(version uint16, baseTime uint32, uptime uint32, recor
for i := range record {
df := record[i]

v, ok := df.Value.([]byte)
if !ok {
continue
}
v := df.Value

switch df.Type {

// Statistics
case netflow.NFV9_FIELD_IN_BYTES:
DecodeUNumber(v, &(flowMessage.Bytes))
Expand Down Expand Up @@ -382,43 +378,11 @@ func SearchNetFlowOptionDataSets(dataFlowSet []netflow.OptionsDataFlowSet) (uint
}

func SplitNetFlowSets(packetNFv9 netflow.NFv9Packet) ([]netflow.DataFlowSet, []netflow.TemplateFlowSet, []netflow.NFv9OptionsTemplateFlowSet, []netflow.OptionsDataFlowSet) {
dataFlowSet := make([]netflow.DataFlowSet, 0)
templatesFlowSet := make([]netflow.TemplateFlowSet, 0)
optionsTemplatesFlowSet := make([]netflow.NFv9OptionsTemplateFlowSet, 0)
optionsDataFlowSet := make([]netflow.OptionsDataFlowSet, 0)
for _, flowSet := range packetNFv9.FlowSets {
switch flowSet.(type) {
case netflow.TemplateFlowSet:
templatesFlowSet = append(templatesFlowSet, flowSet.(netflow.TemplateFlowSet))
case netflow.NFv9OptionsTemplateFlowSet:
optionsTemplatesFlowSet = append(optionsTemplatesFlowSet, flowSet.(netflow.NFv9OptionsTemplateFlowSet))
case netflow.DataFlowSet:
dataFlowSet = append(dataFlowSet, flowSet.(netflow.DataFlowSet))
case netflow.OptionsDataFlowSet:
optionsDataFlowSet = append(optionsDataFlowSet, flowSet.(netflow.OptionsDataFlowSet))
}
}
return dataFlowSet, templatesFlowSet, optionsTemplatesFlowSet, optionsDataFlowSet
return packetNFv9.DataFS, packetNFv9.TemplateFS, packetNFv9.NFv9OptionsTemplateFS, packetNFv9.OptionsDataFS
}

func SplitIPFIXSets(packetIPFIX netflow.IPFIXPacket) ([]netflow.DataFlowSet, []netflow.TemplateFlowSet, []netflow.IPFIXOptionsTemplateFlowSet, []netflow.OptionsDataFlowSet) {
dataFlowSet := make([]netflow.DataFlowSet, 0)
templatesFlowSet := make([]netflow.TemplateFlowSet, 0)
optionsTemplatesFlowSet := make([]netflow.IPFIXOptionsTemplateFlowSet, 0)
optionsDataFlowSet := make([]netflow.OptionsDataFlowSet, 0)
for _, flowSet := range packetIPFIX.FlowSets {
switch flowSet.(type) {
case netflow.TemplateFlowSet:
templatesFlowSet = append(templatesFlowSet, flowSet.(netflow.TemplateFlowSet))
case netflow.IPFIXOptionsTemplateFlowSet:
optionsTemplatesFlowSet = append(optionsTemplatesFlowSet, flowSet.(netflow.IPFIXOptionsTemplateFlowSet))
case netflow.DataFlowSet:
dataFlowSet = append(dataFlowSet, flowSet.(netflow.DataFlowSet))
case netflow.OptionsDataFlowSet:
optionsDataFlowSet = append(optionsDataFlowSet, flowSet.(netflow.OptionsDataFlowSet))
}
}
return dataFlowSet, templatesFlowSet, optionsTemplatesFlowSet, optionsDataFlowSet
return packetIPFIX.DataFS, packetIPFIX.TemplateFS, packetIPFIX.IPFIXOptionsTemplateFS, packetIPFIX.OptionsDataFS
}

// Convert a NetFlow datastructure to a FlowMessage protobuf
Expand Down Expand Up @@ -474,7 +438,7 @@ func ProcessMessageNetFlow(msgDec interface{}, samplingRateSys SamplingRateSyste
fmsg.SamplingRate = uint64(samplingRate)
}
default:
return flowMessageSet, errors.New("Bad NetFlow/IPFIX version")
return flowMessageSet, errors.New("bad NetFlow/IPFIX version")
}

return flowMessageSet, nil
Expand Down
2 changes: 1 addition & 1 deletion producer/producer_nflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ func ProcessMessageNetFlowLegacy(msgDec interface{}) ([]*flowmessage.FlowMessage

return flowMessageSet, nil
default:
return []*flowmessage.FlowMessage{}, errors.New("Bad NetFlow v5 version")
return []*flowmessage.FlowMessage{}, errors.New("bad NetFlow v5 version")
}
}
11 changes: 7 additions & 4 deletions producer/producer_sf.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,11 @@ func ParseSampledHeaderConfig(flowMessage *flowmessage.FlowMessage, sampledHeade
return nil
}

/*
func SearchSFlowSamples(samples []interface{}) []*flowmessage.FlowMessage {
return SearchSFlowSamples(samples)
}
*/

func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowProducerConfig, agent net.IP) []*flowmessage.FlowMessage {
flowMessageSet := make([]*flowmessage.FlowMessage, 0)
Expand All @@ -345,9 +347,10 @@ func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowProducerConfig
flowMessage.OutIf = flowSample.OutputIfValue
}

ipNh := net.IP{}
ipSrc := net.IP{}
ipDst := net.IP{}
var (
ipNh, ipSrc, ipDst net.IP
)

flowMessage.Packets = 1
for _, record := range records {
switch recordData := record.Data.(type) {
Expand Down Expand Up @@ -425,6 +428,6 @@ func ProcessMessageSFlowConfig(msgDec interface{}, config *SFlowProducerConfig)

return flowMessageSet, nil
default:
return []*flowmessage.FlowMessage{}, errors.New("Bad sFlow version")
return []*flowmessage.FlowMessage{}, errors.New("bad sFlow version")
}
}
18 changes: 10 additions & 8 deletions producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,32 @@ import (

func TestProcessMessageNetFlow(t *testing.T) {
records := []netflow.DataRecord{
netflow.DataRecord{
{
Values: []netflow.DataField{
netflow.DataField{
{
Type: netflow.NFV9_FIELD_IPV4_SRC_ADDR,
Value: []byte{10, 0, 0, 1},
},
},
},
}
dfs := []interface{}{
netflow.DataFlowSet{

dfs := []netflow.DataFlowSet{
{
Records: records,
},
}

pktnf9 := netflow.NFv9Packet{
FlowSets: dfs,
FlowSets: netflow.FlowSets{DataFS: dfs},
}

testsr := &SingleSamplingRateSystem{1}
_, err := ProcessMessageNetFlow(pktnf9, testsr)
assert.Nil(t, err)

pktipfix := netflow.IPFIXPacket{
FlowSets: dfs,
FlowSets: netflow.FlowSets{DataFS: dfs},
}
_, err = ProcessMessageNetFlow(pktipfix, testsr)
assert.Nil(t, err)
Expand All @@ -58,15 +60,15 @@ func TestProcessMessageSFlow(t *testing.T) {
sflow.FlowSample{
SamplingRate: 1,
Records: []sflow.FlowRecord{
sflow.FlowRecord{
{
Data: sh,
},
},
},
sflow.ExpandedFlowSample{
SamplingRate: 1,
Records: []sflow.FlowRecord{
sflow.FlowRecord{
{
Data: sh,
},
},
Expand Down
Loading

0 comments on commit b4c1470

Please sign in to comment.