Skip to content

Commit

Permalink
Add support for reporting on multiple flows.
Browse files Browse the repository at this point in the history
 * (M) flows/mpls/mpls.go
   - Previously we would just report on the latest flow meaning that >1
     flow was not supported in tests. This change adds a reporter that
     can report on multiple flows.

     Testing here is minimal because the wider magna code needs to be
     initiating flows to meaningfully test this.
 * (M) e2e/simple_ondatra_test.go
   - Add e2e test that covers multiple MPLS flows.
  • Loading branch information
robshakir committed Aug 21, 2023
1 parent 30abb3e commit 292bd68
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 88 deletions.
108 changes: 88 additions & 20 deletions e2e/simple_ondatra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/openconfig/ondatra"
"github.com/openconfig/ondatra/gnmi"
"github.com/openconfig/ondatra/knebind/solver"
"github.com/openconfig/ondatra/otg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -168,23 +169,8 @@ func TestMirror(t *testing.T) {
stopMirror(t, client)
}

// TestMPLS is a simple test that creates an MPLS flow between two ATE ports and
// checks that there is no packet loss. It validates magna's end-to-end MPLS
// flow accounting.
func TestMPLS(t *testing.T) {
// Start a mirroring session to copy packets.
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()
mplsFlow := otgCfg.Flows().Add().SetName("MPLS_FLOW")
func addMPLSFlow(t *testing.T, otgCfg gosnappi.Config, name, srcv4, dstv4 string) {
mplsFlow := otgCfg.Flows().Add().SetName(name)
mplsFlow.Metrics().SetEnable(true)
mplsFlow.TxRx().Port().SetTxName(ateSrc.Name).SetRxName(ateDst.Name)

Expand All @@ -209,10 +195,37 @@ func TestMPLS(t *testing.T) {
mplsInner.BottomOfStack().SetChoice("value").SetValue(1)

ip4 := mplsFlow.Packet().Add().Ipv4()
ip4.Src().SetChoice("value").SetValue("100.64.1.1")
ip4.Dst().SetChoice("value").SetValue("100.64.1.2")
ip4.Src().SetChoice("value").SetValue(srcv4)
ip4.Dst().SetChoice("value").SetValue(dstv4)
ip4.Version().SetChoice("value").SetValue(4)

}

const (
// lossTolerance indicates the number of packets we are prepared to lose during
// a test. If the packets per second generation rate is low then the flow can be
// stopped before a slow packet generator creates the next packet.
lossTolerance = 1
)

// TestMPLS is a simple test that creates an MPLS flow between two ATE ports and
// checks that there is no packet loss. It validates magna's end-to-end MPLS
// flow accounting.
func TestMPLS(t *testing.T) {
// Start a mirroring session to copy packets.
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()
addMPLSFlow(t, otgCfg, "MPLS_FLOW", "100.64.1.1", "100.64.1.2")

otg.PushConfig(t, otgCfg)

t.Logf("Starting MPLS traffic...")
Expand All @@ -226,7 +239,62 @@ func TestMPLS(t *testing.T) {
time.Sleep(1 * time.Second)
metrics := gnmi.Get(t, otg, gnmi.OTG().Flow("MPLS_FLOW").State())
got, want := metrics.GetCounters().GetInPkts(), metrics.GetCounters().GetOutPkts()
if lossPackets := want - got; lossPackets != 0 {
if lossPackets := want - got; lossPackets > lossTolerance {
t.Fatalf("did not get expected number of packets, got: %d, want: %d", got, want)
}
}

func TestMPLSFlows(t *testing.T) {
tests := []struct {
desc string
inFlowFn func(*testing.T, gosnappi.Config)
inCheckFn func(*testing.T, *otg.OTG)
}{{
desc: "two flows - same source port",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
addMPLSFlow(t, cfg, "FLOW_ONE", "100.64.1.1", "100.64.1.2")
addMPLSFlow(t, cfg, "FLOW_TWO", "100.64.2.1", "100.64.2.2")
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
for _, f := range []string{"FLOW_ONE", "FLOW_TWO"} {
metrics := gnmi.Get(t, otgc, gnmi.OTG().Flow(f).State())
got, want := metrics.GetCounters().GetInPkts(), metrics.GetCounters().GetOutPkts()
if lossPackets := want - got; lossPackets > lossTolerance {
t.Errorf("Flow %s: did not get expected number of packets, got: %d, want :%d", f, got, want)
}
}
},
}}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()

tt.inFlowFn(t, otgCfg)

otg.PushConfig(t, otgCfg)

t.Logf("Starting MPLS traffic...")
otg.StartTraffic(t)
t.Logf("Sleeping for %s...", *sleepTime)
time.Sleep(*sleepTime)
t.Logf("Stopping MPLS traffic...")
otg.StopTraffic(t)

time.Sleep(1 * time.Second)

tt.inCheckFn(t, otg)

})
}
}
61 changes: 52 additions & 9 deletions flows/mpls/mpls.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,55 @@ func headers(f *otg.Flow) ([]gopacket.SerializableLayer, error) {
return pktLayers, nil
}

// flowReporter encapsulates multiple named flows.
type flowReporter struct {
mu sync.RWMutex
// counters is a map of counters, keyed by the flow name, for each flow.
counters map[string]*flowCounters
}

// newReporter returns a new flow reporter.
func newReporter() *flowReporter {
return &flowReporter{
counters: map[string]*flowCounters{},
}
}

// addFlow adds the supplied flow, fc, as the entry at name in the map. It overwrites
// existing entries.
func (r *flowReporter) addFlow(name string, fc *flowCounters) {
r.mu.Lock()
defer r.mu.Unlock()
r.counters[name] = fc
}

// getFlow returns the counters for the flow with the specified name.
func (r *flowReporter) getFlow(name string) *flowCounters {
r.mu.RLock()
defer r.mu.RUnlock()
return r.counters[name]
}

// telemetry creates the gNMI telemetry for all flows within the reporter.
func (r *flowReporter) telemetry(updateFn gnmit.UpdateFn, target string) {
r.mu.RLock()
defer r.mu.RUnlock()
for name, f := range r.counters {
for _, u := range f.telemetry(target) {
klog.Infof("flow %s: sending telemetry update %s", name, u)
updateFn(u)
}
}
}

// New returns a new MPLS flow generator, consisting of:
// - a FlowGeneratorFn that is used in lwotg to create the MPLS flow.
// - a gnmit.Task that is used to write telemetry.
func New() (lwotg.FlowGeneratorFn, gnmit.Task, error) {
// TODO(robjs): We need a flow counter for each individual flow. This
// implementation results in just one flow being supported currently.
f := newFlowCounters()
// reporter encapsulates the counters for multiple flows. The MPLS flow handler is
// created once at startup time of the magna instance.
reporter := newReporter()

// t is a gnmit Task which reads from the gnmi channel specified and writes
// into the cache.
t := gnmit.Task{
Expand All @@ -187,10 +229,7 @@ func New() (lwotg.FlowGeneratorFn, gnmit.Task, error) {
defer cleanup()
for {
<-ticker.C
for _, u := range f.telemetry(target) {
klog.Infof("sending telemetry update %s", u)
updateFn(u)
}
reporter.telemetry(updateFn, target)
}
}()
return nil
Expand All @@ -203,7 +242,8 @@ func New() (lwotg.FlowGeneratorFn, gnmit.Task, error) {
return nil, false, err
}

f.Headers = hdrs
fc := newFlowCounters()
fc.Headers = hdrs

pps, err := common.Rate(flow, hdrs)
if err != nil {
Expand All @@ -215,10 +255,12 @@ func New() (lwotg.FlowGeneratorFn, gnmit.Task, error) {
return nil, false, fmt.Errorf("cannot determine ports, %v", err)
}

f.Name = &val{s: flow.Name, ts: flowTimeFn()}
fc.Name = &val{s: flow.Name, ts: flowTimeFn()}
klog.Infof("generating flow %s: tx: %s, rx: %s, rate: %d pps", flow.GetName(), tx, rx, pps)
reporter.addFlow(flow.Name, fc)

genFunc := func(stop chan struct{}) {
f := reporter.getFlow(flow.Name)
klog.Infof("MPLSFlowHandler send function started.")
f.clearStats()

Expand Down Expand Up @@ -291,6 +333,7 @@ func New() (lwotg.FlowGeneratorFn, gnmit.Task, error) {

ps := gopacket.NewPacketSource(handle, handle.LinkType())
packetCh := ps.Packets()
f := reporter.getFlow(flow.Name)
for {
select {
case <-stop:
Expand Down
35 changes: 24 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/gopacket v1.1.19
github.com/open-traffic-generator/snappi/gosnappi v0.11.15
github.com/openconfig/gnmi v0.9.1
github.com/openconfig/featureprofiles v0.0.0-20230629171650-1057c66c4e52
github.com/openconfig/gnmi v0.10.0
github.com/openconfig/goyang v1.4.0
github.com/openconfig/ondatra v0.1.24-0.20230707220529-4774ac3d2272
github.com/openconfig/ygot v0.28.3
github.com/openconfig/kne v0.1.14
github.com/openconfig/ondatra v0.2.7
github.com/openconfig/ygot v0.29.9
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
google.golang.org/grpc v1.56.0
google.golang.org/protobuf v1.30.0
google.golang.org/grpc v1.58.0-dev
google.golang.org/protobuf v1.31.0
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.90.1
)

require (
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/pubsub v1.30.0 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230518184743-7afd39499903 // indirect
Expand Down Expand Up @@ -46,7 +54,10 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand All @@ -67,15 +78,14 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/networkop/meshnet-cni v0.3.1-0.20230525201116-d7c306c635cf // indirect
github.com/open-traffic-generator/ixia-c-operator v0.3.0 // indirect
github.com/openconfig/featureprofiles v0.0.0-20230629171650-1057c66c4e52 // indirect
github.com/open-traffic-generator/ixia-c-operator v0.3.4 // indirect
github.com/openconfig/gnoi v0.1.0 // indirect
github.com/openconfig/gnsi v1.2.1 // indirect
github.com/openconfig/gocloser v0.0.0-20220310182203-c6c950ed3b0b // indirect
github.com/openconfig/gribi v1.0.0 // indirect
github.com/openconfig/grpctunnel v0.0.0-20220819142823-6f5422b8ca70 // indirect
github.com/openconfig/kne v0.1.13 // indirect
github.com/openconfig/lemming/operator v0.2.0 // indirect
github.com/openconfig/ygnmi v0.7.10 // indirect
github.com/openconfig/ygnmi v0.8.7 // indirect
github.com/p4lang/p4runtime v1.4.0-rc.5 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pborman/uuid v1.2.1 // indirect
Expand All @@ -100,16 +110,20 @@ require (
github.com/subosito/gotenv v1.4.2 // indirect
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/api v0.122.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230525234025-438c736192d0 // indirect
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand All @@ -119,7 +133,6 @@ require (
k8s.io/api v0.26.3 // indirect
k8s.io/apimachinery v0.26.3 // indirect
k8s.io/client-go v0.26.3 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230308215209-15aac26d736a // indirect
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 // indirect
sigs.k8s.io/controller-runtime v0.14.5 // indirect
Expand Down
Loading

0 comments on commit 292bd68

Please sign in to comment.