Skip to content

Commit

Permalink
add third parameter domainID/srcID to key in template cache for IPFix/v9
Browse files Browse the repository at this point in the history
  • Loading branch information
rachelScout committed Oct 19, 2022
1 parent 40e66a7 commit cdacb22
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 530 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The IPFIX data decodes to JSON format and IDs are [IANA IPFIX element ID](http:/

## Decoded sFlow data
```json
{"Version":5,"IPVersion":1,"AgentSubID":5,"SequenceNo":37591,"SysUpTime":3287084017,"SamplesNo":1,"Samples":[{"SequenceNo":1530345639,"SourceID":0,"SamplingRate":4096,"SamplePool":1938456576,"Drops":0,"Input":536,"Output":728,"RecordsNo":3,"Records":{"ExtRouter":{"NextHop":"115.131.251.90","SrcMask":24,"DstMask":14},"ExtSwitch":{"SrcVlan":0,"SrcPriority":0,"DstVlan":0,"DstPriority":0},"RawHeader":{"L2":{"SrcMAC":"58:00:bb:e7:57:6f","DstMAC":"f4:a7:39:44:a8:27","Vlan":0,"EtherType":2048},"L3":{"Version":4,"TOS":0,"TotalLen":1452,"ID":13515,"Flags":0,"FragOff":0,"TTL":62,"Protocol":6,"Checksum":8564,"Src":"10.1.8.5","Dst":"161.140.24.181"},"L4":{"SrcPort":443,"DstPort":56521,"DataOffset":5,"Reserved":0,"Flags":16}}}}],"IPAddress":"192.168.10.0","ColTime": 1646157296}
{"Version":5,"IPVersion":1,"AgentSubID":5,"SequenceNo":37591,"SysUpTime":3287084017,"SamplesNo":1,"Samples":[{"SequenceNo":1530345639,"SourceID":0,"SamplingRate":4096,"SamplePool":1938456576,"Drops":0,"Input":536,"Output":728,"RecordsNo":3,"Records":{"ExtRouter":{"NextHop":"115.131.251.90","SrcMask":24,"DstMask":14},"ExtSwitch":{"SrcVlan":0,"SrcPriority":0,"DstVlan":0,"DstPriority":0},"RawHeader":{"L2":{"SrcMAC":"58:00:bb:e7:57:6f","DstMAC":"f4:a7:39:44:a8:27","Vlan":0,"EtherType":2048},"L3":{"Version":4,"TOS":0,"TotalLen":1452,"ID":13515,"Flags":0,"FragOff":0,"TTL":62,"Protocol":6,"Checksum":8564,"Src":"10.1.8.5","Dst":"161.140.24.181"},"L4":{"SrcPort":443,"DstPort":56521,"DataOffset":5,"Reserved":0,"Flags":16}}}}],"IPAddress":"192.168.10.0"}
```
## Decoded Netflow v5 data
``` json
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ module github.com/EdgeCast/vflow
go 1.15

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/Shopify/sarama v1.33.0
github.com/ClickHouse/clickhouse-go v1.4.3
github.com/Shopify/sarama v1.26.3
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/nats-io/nats-server/v2 v2.8.2 // indirect
github.com/nats-io/nats.go v1.15.0
github.com/nsqio/go-nsq v1.1.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.19.0 // indirect
github.com/prometheus/client_golang v1.12.2
github.com/segmentio/kafka-go v0.4.31
golang.org/x/net v0.0.0-20220513224357-95641704303c
gopkg.in/yaml.v2 v2.4.0
github.com/nats-io/nats-server/v2 v2.1.8 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/nsqio/go-nsq v1.0.8
github.com/onsi/ginkgo v1.14.2 // indirect
github.com/onsi/gomega v1.10.3 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/segmentio/kafka-go v0.4.7
golang.org/x/net v0.0.0-20201021035429-f5854403a974
gopkg.in/yaml.v2 v2.3.0
)
559 changes: 78 additions & 481 deletions go.sum

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions ipfix/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,19 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
if setHeader.SetID > 255 {
var ok bool
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr); !ok {
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr, msg.Header.DomainID); !ok {
select {
case rpcChan <- RPCRequest{
ID: setHeader.SetID,
IP: d.raddr,
SrcID: msg.Header.DomainID,
}:
default:
}
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d",
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d with domain ID %d",
d.raddr.String(),
setHeader.SetID,
msg.Header.DomainID,
)}
}
}
Expand All @@ -196,7 +198,7 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
err = tr.unmarshalOpts(d.reader)
}
if err == nil {
mem.insert(tr.TemplateID, d.raddr, tr)
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.DomainID)
}
} else if setID >= 4 && setID <= 255 {
// Reserved set, do not read any records
Expand Down
6 changes: 3 additions & 3 deletions ipfix/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
var tpl, optsTpl, multiMessage, unknownDatasetMessage []byte

func init() {
// IPFIX packet including template SetID:2, 25 fields
// IPFIX packet including template SetID:2, 25 fields, Domain id 33792
tpl = []byte{
0x0, 0xa, 0x0, 0x7c, 0x58, 0x90, 0xd6, 0x40, 0x28, 0xf7,
0xa0, 0x4a, 0x0, 0x0, 0x84, 0x0, 0x0, 0x2, 0x0, 0x6c, 0x1,
Expand Down Expand Up @@ -205,8 +205,8 @@ func TestUnknownDatasetsMessage(t *testing.T) {
t.Error("Did not expect any result datasets, but got", l)
}
expectedErrorStr := `Multiple errors:
- 127.0.0.1 unknown ipfix template id# 264
- 127.0.0.1 unknown ipfix template id# 264`
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1`
if err == nil || err.Error() != expectedErrorStr {
t.Error("Received unexpected erorr:", err)
}
Expand Down
23 changes: 14 additions & 9 deletions ipfix/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,32 @@ func GetCache(cacheFile string) MemCache {
return m
}

func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, id)
key := append(addr, b...)

func (m MemCache) getShard(templateId uint16, addr net.IP, domainId uint32) (*TemplatesShard, uint32) {
var key []byte
hash := fnv.New32()
dId := make([]byte, 4)
tID := make([]byte, 2)
binary.LittleEndian.PutUint32(dId, domainId)
binary.BigEndian.PutUint16(tID, templateId)
key = append(key, addr...)
key = append(key, dId...)
key = append(key, tID...)

hash.Write(key)
hSum32 := hash.Sum32()

return m[uint(hSum32)%uint(shardNo)], hSum32
}

func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
shard, key := m.getShard(id, addr)
func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, domainID uint32) {
shard, key := m.getShard(id, addr, domainID)
shard.Lock()
defer shard.Unlock()
shard.Templates[key] = Data{tr, time.Now().Unix()}
}

func (m MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr)
func (m MemCache) retrieve(id uint16, addr net.IP, domainID uint32) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr, domainID)
shard.RLock()
defer shard.RUnlock()
v, ok := shard.Templates[key]
Expand Down
5 changes: 3 additions & 2 deletions ipfix/memcache_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RPCConfig struct {
type RPCRequest struct {
ID uint16
IP net.IP
SrcID uint32
}

type vFlowServer struct {
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewRPC(mCache MemCache) *IRPC {
func (r *IRPC) Get(req RPCRequest, resp *TemplateRecord) error {
var ok bool

*resp, ok = r.mCache.retrieve(req.ID, req.IP)
*resp, ok = r.mCache.retrieve(req.ID, req.IP, req.SrcID)
if !ok {
return errNotAvail
}
Expand Down Expand Up @@ -168,7 +169,7 @@ func RPC(m MemCache, config *RPCConfig) {
continue
}

m.insert(req.ID, req.IP, *tr)
m.insert(req.ID, req.IP, *tr, req.SrcID)
break
}

Expand Down
42 changes: 36 additions & 6 deletions ipfix/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMemCacheRetrieve(t *testing.T) {
mCache := GetCache("cache.file")
d := NewDecoder(ip, tpl)
d.Decode(mCache)
v, ok := mCache.retrieve(256, ip)
v, ok := mCache.retrieve(256, ip, 33792)
if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
Expand All @@ -48,9 +48,9 @@ func TestMemCacheInsert(t *testing.T) {
mCache := GetCache("cache.file")

tpl.TemplateID = 310
mCache.insert(310, ip, tpl)
mCache.insert(310, ip, tpl, 513)

v, ok := mCache.retrieve(310, ip)
v, ok := mCache.retrieve(310, ip, 513)
if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
Expand All @@ -65,15 +65,45 @@ func TestMemCacheAllSetIds(t *testing.T) {
mCache := GetCache("cache.file")

tpl.TemplateID = 310
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.TemplateID = 410
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.TemplateID = 210
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)

expected := []int{210, 310, 410}
actual := mCache.allSetIds()
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected set IDs %v, got %v", expected, actual)
}
}

func TestMemCache_keyWithDifferentDomainIDs(t *testing.T) {
var tpl TemplateRecord
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")

tpl.TemplateID = 310
tpl.FieldCount = 19
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.FieldCount = 21
mCache.insert(tpl.TemplateID, ip, tpl, 514)

v, ok := mCache.retrieve(tpl.TemplateID, ip, 513)

if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
if v.FieldCount != 19 {
t.Error("expected template id#:310 with Field count#:19, got", v.TemplateID, v.FieldCount)
}

v, ok = mCache.retrieve(tpl.TemplateID, ip, 514)

if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
if v.FieldCount != 21 {
t.Error("expected template id#:310 with Field count#:21, got", v.TemplateID, v.FieldCount)
}
}
10 changes: 5 additions & 5 deletions netflow/v9/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,11 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
if setHeader.FlowSetID > 255 {
var ok bool
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr)
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr, msg.Header.SrcID)
if !ok {
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d",
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d from sourceID %d",
d.raddr.String(),
setHeader.FlowSetID,
setHeader.FlowSetID, msg.Header.SrcID,
))
}
}
Expand All @@ -446,9 +446,9 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
err = tr.unmarshalOpts(d.reader)
}
if err == nil {
mem.insert(tr.TemplateID, d.raddr, tr)
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.SrcID)
}
} else if setId >= 4 && setId <= 255 {
} else if setId <= 255 {
// Reserved set, do not read any records
break
} else {
Expand Down
23 changes: 14 additions & 9 deletions netflow/v9/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,32 @@ func GetCache(cacheFile string) MemCache {
return m
}

func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, id)
key := append(addr, b...)

func (m MemCache) getShard(templateId uint16, addr net.IP, srcId uint32) (*TemplatesShard, uint32) {
var key []byte
hash := fnv.New32()
sId := make([]byte, 4)
tID := make([]byte, 2)
binary.LittleEndian.PutUint32(sId, srcId)
binary.BigEndian.PutUint16(tID, templateId)
key = append(key, addr...)
key = append(key, sId...)
key = append(key, tID...)

hash.Write(key)
hSum32 := hash.Sum32()

return m[uint(hSum32)%uint(shardNo)], hSum32
}

func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
shard, key := m.getShard(id, addr)
func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, srcID uint32) {
shard, key := m.getShard(id, addr, srcID)
shard.Lock()
defer shard.Unlock()
shard.Templates[key] = Data{tr, time.Now().Unix()}
}

func (m *MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr)
func (m *MemCache) retrieve(id uint16, addr net.IP, srcID uint32) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr, srcID)
shard.RLock()
defer shard.RUnlock()
v, ok := shard.Templates[key]
Expand Down

0 comments on commit cdacb22

Please sign in to comment.