Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebpf plugin + parser #1196

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions pkg/plugin/ebpfwindows/dropreasons_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package ebpfwindows

import (
"fmt"
)

// DropMin numbers less than this are non-drop reason codes
var DropMin uint8 = 130

// DropInvalid is the Invalid packet reason.
var DropInvalid uint8 = 2

// These values are shared with bpf/lib/common.h and api/v1/flow/flow.proto.
var dropErrors = map[uint8]string{
0: "Success",
2: "Invalid packet",
3: "Plain Text",
4: "Interface Decrypted",
5: "LB: No backend slot entry found",
6: "LB: No backend entry found",
7: "LB: Reverse entry update failed",
8: "LB: Reverse entry stale",
9: "Fragmented packet",
10: "Fragmented packet entry update failed",
11: "Missed tail call to custom program",
}

// Keep in sync with __id_for_file in bpf/lib/source_info.h.
var files = map[uint8]string{

// source files from bpf/
1: "bpf_host.c",
2: "bpf_lxc.c",
3: "bpf_overlay.c",
4: "bpf_xdp.c",
5: "bpf_sock.c",
6: "bpf_network.c",

// header files from bpf/lib/
101: "arp.h",
102: "drop.h",
103: "srv6.h",
104: "icmp6.h",
105: "nodeport.h",
106: "lb.h",
107: "mcast.h",
108: "ipv4.h",
109: "conntrack.h",
110: "l3.h",
111: "trace.h",
112: "encap.h",
113: "encrypt.h",
}

// BPFFileName returns the file name for the given BPF file id.
func BPFFileName(id uint8) string {
if name, ok := files[id]; ok {
return name
}
return fmt.Sprintf("unknown(%d)", id)
}

func extendedReason(extError int8) string {
if extError == int8(0) {
return ""
}
return fmt.Sprintf("%d", extError)
}

func DropReasonExt(reason uint8, extError int8) string {
if err, ok := dropErrors[reason]; ok {
if ext := extendedReason(extError); ext == "" {
return err
} else {
return err + ", " + ext
}
}
return fmt.Sprintf("%d, %d", reason, extError)
}

// DropReason prints the drop reason in a human readable string
func DropReason(reason uint8) string {
return DropReasonExt(reason, int8(0))
}
310 changes: 310 additions & 0 deletions pkg/plugin/ebpfwindows/ebpf_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package ebpfwindows

import (
"context"
"errors"
"time"
//"fmt"

Check failure on line 7 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 7 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"unsafe"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
hp "github.com/cilium/cilium/pkg/hubble/parser"
kcfg "github.com/microsoft/retina/pkg/config"
observer "github.com/cilium/cilium/pkg/hubble/observer/types"
//"github.com/google/uuid"

Check failure on line 13 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 13 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/metrics"
"github.com/microsoft/retina/pkg/plugin/registry"
//"github.com/microsoft/retina/pkg/utils"

Check failure on line 18 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 18 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
// name of the ebpfwindows plugin
name string = "windowseBPF"
// name of the metrics
packetsReceived string = "win_packets_recv_count"
packetsSent string = "win_packets_sent_count"
bytesSent string = "win_bytes_sent_count"
bytesReceived string = "win_bytes_recv_count"
droppedPacketsIncoming string = "win_packets_recv_drop_count"
droppedPacketsOutgoing string = "win_packets_sent_drop_count"
// metrics direction
ingressLabel = "ingress"
egressLabel = "egress"
)

var (
ErrInvalidEventData = errors.New("The Cilium Event Data is invalid")
ErrNilEnricher = errors.New("enricher is nil")
)

// Plugin is the ebpfwindows plugin
type Plugin struct {
l *log.ZapLogger
cfg *kcfg.Config
enricher *enricher.Enricher
externalChannel chan *v1.Event
parser *hp.Parser
}

func init() {
registry.Add(name, New)
}

func New(cfg *kcfg.Config) registry.Plugin {
return &Plugin{
l: log.Logger().Named(name),
cfg: cfg,
}
}

// Init is a no-op for the ebpfwindows plugin
func (p *Plugin) Init() error {

parser, err := hp.New(logrus.WithField("cilium", "parser"),
nil,
nil,
nil,
nil,
nil,
nil,
nil,
)

if err != nil {
p.l.Fatal("Failed to create parser", zap.Error(err))
return err
}

p.parser = parser
return nil
}

// Name returns the name of the ebpfwindows plugin
func (p *Plugin) Name() string {
return name
}

// Start the plugin by starting a periodic timer.
func (p *Plugin) Start(ctx context.Context) error {

p.l.Info("Start ebpfWindows plugin...")
p.enricher = enricher.Instance()

if p.enricher == nil {
return ErrNilEnricher
}

p.pullCiliumMetricsAndEvents(ctx)
return nil
}

// metricsMapIterateCallback is the callback function that is called for each key-value pair in the metrics map.
func (p *Plugin) metricsMapIterateCallback(key *MetricsKey, value *MetricsValues) {
p.l.Info("MetricsMapIterateCallback")
p.l.Info("Key", zap.String("Key", key.String()))
p.l.Info("Value", zap.String("Value", value.String()))

if key.IsDrop() {
if key.IsEgress() {
metrics.DropPacketsGauge.WithLabelValues(egressLabel).Set(float64(value.Count()))
} else if key.IsIngress() {
metrics.DropPacketsGauge.WithLabelValues(ingressLabel).Set(float64(value.Count()))
}

} else {

if key.IsEgress() {
metrics.ForwardBytesGauge.WithLabelValues(egressLabel).Set(float64(value.Bytes()))
p.l.Debug("emitting bytes sent count metric", zap.Uint64(bytesSent, value.Bytes()))
metrics.WindowsGauge.WithLabelValues(packetsSent).Set(float64(value.Count()))
p.l.Debug("emitting packets sent count metric", zap.Uint64(packetsSent, value.Count()))
} else if key.IsIngress() {
metrics.ForwardPacketsGauge.WithLabelValues(ingressLabel).Set(float64(value.Count()))
p.l.Debug("emitting packets received count metric", zap.Uint64(packetsReceived, value.Count()))
metrics.ForwardBytesGauge.WithLabelValues(ingressLabel).Set(float64(value.Bytes()))
p.l.Debug("emitting bytes received count metric", zap.Uint64(bytesReceived, value.Bytes()))
}
}
}

// eventsMapCallback is the callback function that is called for each value in the events map.
func (p *Plugin) eventsMapCallback(data unsafe.Pointer, size uint64) int {
p.l.Info("EventsMapCallback")
p.l.Info("Size", zap.Uint64("Size", size))
err := p.handleTraceEvent(data, size)

if err != nil {
p.l.Error("Error handling trace event", zap.Error(err))
return -1
}

return 0
}

// pullCiliumeBPFMetrics is the function that is called periodically by the timer.
func (p *Plugin) pullCiliumMetricsAndEvents(ctx context.Context) {

eventsMap := NewEventsMap()
metricsMap := NewMetricsMap()

err := eventsMap.RegisterForCallback(p.eventsMapCallback)

if err != nil {
p.l.Error("Error registering for events map callback", zap.Error(err))
return
}

ticker := time.NewTicker(p.cfg.MetricsInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := metricsMap.IterateWithCallback(p.metricsMapIterateCallback)
if err != nil {
p.l.Error("Error iterating metrics map", zap.Error(err))
}
case <-ctx.Done():
p.l.Error("ebpfwindows plugin canceling", zap.Error(ctx.Err()))
eventsMap.UnregisterForCallback()

Check failure on line 172 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

Error return value of `eventsMap.UnregisterForCallback` is not checked (errcheck)

Check failure on line 172 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

Error return value of `eventsMap.UnregisterForCallback` is not checked (errcheck)
return
}
}
}

// SetupChannel saves the external channel to which the plugin will send events.
func (p *Plugin) SetupChannel(ch chan *v1.Event) error {
p.externalChannel = ch
return nil
}

// Stop the plugin by cancelling the periodic timer.
func (p *Plugin) Stop() error {

p.l.Info("Stop ebpfWindows plugin...")
return nil
}

// Compile is a no-op for the ebpfwindows plugin
func (p *Plugin) Compile(context.Context) error {
return nil
}

// Generate is a no-op for the ebpfwindows plugin
func (p *Plugin) Generate(context.Context) error {
return nil
}

func (p *Plugin) handleDropNotify(dropNotify *DropNotify) {
p.l.Info("DropNotify", zap.String("DropNotify", dropNotify.String()))
}

func (p *Plugin) handleTraceNotify(traceNotify *TraceNotify) {
p.l.Info("TraceNotify", zap.String("TraceNotify", traceNotify.String()))
}

func (p *Plugin) handleTraceSockNotify(traceSockNotify *TraceSockNotify) {
p.l.Info("TraceSockNotify", zap.String("TraceSockNotify", traceSockNotify.String()))
}

func (p *Plugin) validateFlowObject() {
}

func (p *Plugin) handleTraceEvent(data unsafe.Pointer, size uint64) error {

if uintptr(size) < unsafe.Sizeof(uint8(0)) {
return ErrInvalidEventData
}

eventType := *(*uint8)(data)
switch eventType {
case CiliumNotifyDrop:

if uintptr(size) < unsafe.Sizeof(DropNotify{}) {
p.l.Error("Invalid DropNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}

dropNotify := (*DropNotify)(data)
p.handleDropNotify(dropNotify)
e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[166]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}

flowType := e.GetFlow().GetType().String()
p.l.Info("Event converted successfully", zap.String("flowType", flowType))
srcIP := e.GetFlow().IP.Source

Check failure on line 245 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

avoid direct access to proto field e.GetFlow().IP.Source, use e.GetFlow().GetIP().GetSource() instead (protogetter)

Check failure on line 245 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

avoid direct access to proto field e.GetFlow().IP.Source, use e.GetFlow().GetIP().GetSource() instead (protogetter)
dstIP := e.GetFlow().IP.Destination

Check failure on line 246 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

avoid direct access to proto field e.GetFlow().IP.Destination, use e.GetFlow().GetIP().GetDestination() instead (protogetter)

Check failure on line 246 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

avoid direct access to proto field e.GetFlow().IP.Destination, use e.GetFlow().GetIP().GetDestination() instead (protogetter)
srcP := e.GetFlow().GetL4().GetTCP().GetSourcePort()
dstP := e.GetFlow().GetL4().GetTCP().GetDestinationPort()
p.l.Info("5 TUPLE", zap.String("srcIP", srcIP), zap.String("dstIP", dstIP), zap.Uint32("srcP", srcP), zap.Uint32("dstP", dstP))

case CiliumNotifyTrace:

if uintptr(size) < unsafe.Sizeof(TraceNotify{}) {
p.l.Error("Invalid TraceNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}

traceNotify := (*TraceNotify)(data)
p.handleTraceNotify(traceNotify)
e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[178]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}

flowType := e.GetFlow().GetType().String()
p.l.Info("Event converted successfully", zap.String("flowType", flowType))
srcIP := e.GetFlow().IP.Source

Check failure on line 272 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

avoid direct access to proto field e.GetFlow().IP.Source, use e.GetFlow().GetIP().GetSource() instead (protogetter)

Check failure on line 272 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

avoid direct access to proto field e.GetFlow().IP.Source, use e.GetFlow().GetIP().GetSource() instead (protogetter)
dstIP := e.GetFlow().IP.Destination

Check failure on line 273 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

avoid direct access to proto field e.GetFlow().IP.Destination, use e.GetFlow().GetIP().GetDestination() instead (protogetter)

Check failure on line 273 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

avoid direct access to proto field e.GetFlow().IP.Destination, use e.GetFlow().GetIP().GetDestination() instead (protogetter)
srcP := e.GetFlow().GetL4().GetTCP().GetSourcePort()
dstP := e.GetFlow().GetL4().GetTCP().GetDestinationPort()
p.l.Info("PHY TUPLE", zap.String("srcIP", srcIP), zap.String("dstIP", dstIP), zap.Uint32("srcP", srcP), zap.Uint32("dstP", dstP))


case CiliumNotifyTraceSock:
if uintptr(size) < unsafe.Sizeof(TraceSockNotify{}) {
p.l.Error("Invalid TraceSockNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}

traceSockNotify := (*TraceSockNotify)(data)
p.handleTraceSockNotify(traceSockNotify)

e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[174]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}

flowType := e.GetFlow().GetType().String()
p.l.Info("Event converted successfully", zap.String("flowType", flowType))
srcIP := e.GetFlow().IP.Source
dstIP := e.GetFlow().IP.Destination
srcP := e.GetFlow().GetL4().GetTCP().GetSourcePort()
dstP := e.GetFlow().GetL4().GetTCP().GetDestinationPort()
p.l.Info("PHY TUPLE", zap.String("srcIP", srcIP), zap.String("dstIP", dstIP), zap.Uint32("srcP", srcP), zap.Uint32("dstP", dstP))

}


return nil
}
Loading
Loading