Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reporting on multiple flows. #29

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 271 additions & 40 deletions e2e/simple_ondatra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/openconfig/ondatra"
"github.com/openconfig/ondatra/gnmi"
"github.com/openconfig/ondatra/knebind/solver"
"github.com/openconfig/ondatra/otg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -54,6 +55,11 @@ var (
Name: "port2",
MAC: "02:00:02:01:01:01",
}

ateAuxDst = &intf{
Name: "port3",
MAC: "03:00:03:01:01:01",
}
)

func TestMain(m *testing.M) {
Expand All @@ -65,7 +71,7 @@ func TestMain(m *testing.M) {
func configureATEInterfaces(t *testing.T, ate *ondatra.ATEDevice, srcATE, dstATE *intf) (gosnappi.Config, error) {
otg := ate.OTG()
topology := otg.NewConfig(t)
for _, p := range []*intf{ateSrc, ateDst} {
for _, p := range []*intf{ateSrc, ateDst, ateAuxDst} {
topology.Ports().Add().SetName(p.Name)
dev := topology.Devices().Add().SetName(p.Name)
eth := dev.Ethernets().Add().SetName(fmt.Sprintf("%s_ETH", p.Name))
Expand Down Expand Up @@ -119,39 +125,50 @@ func mirrorClient(t *testing.T, addr string) (mpb.MirrorClient, func() error) {
return mpb.NewMirrorClient(conn), conn.Close
}

// startMirror begins traffic mirroring between port1 and port2 on the mirror
// startTwoPortMirror begins traffic mirroring between port1 and port2 on the mirror
// container in the topology.
func startMirror(t *testing.T, client mpb.MirrorClient) {
func startTwoPortMirror(t *testing.T, client mpb.MirrorClient) {
t.Helper()
mirror := ondatra.DUT(t, "mirror")
startMirrors(t, client, &mpb.StartRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
})
}

// startMirrors starts the mirrors described by the supplied requests.
func startMirrors(t *testing.T, client mpb.MirrorClient, reqs ...*mpb.StartRequest) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

startReq := &mpb.StartRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
}
if _, err := client.Start(ctx, startReq); err != nil {
t.Fatalf("cannot start mirror client, got err: %v", err)
for _, sr := range reqs {
if _, err := client.Start(ctx, sr); err != nil {
t.Fatalf("cannot start mirror client, got err: %v", err)
}
}
}

// stopMirror stops traffic mirroring between port1 and port2 on the mirror
// stopTwoPortMirror stops traffic mirroring between port1 and port2 on the mirror
// container in the topology.
func stopMirror(t *testing.T, client mpb.MirrorClient) {
func stopTwoPortMirror(t *testing.T, client mpb.MirrorClient) {
t.Helper()
mirror := ondatra.DUT(t, "mirror")
stopMirrors(t, client, &mpb.StopRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
})

}

// stopMirrors stops the mirrors described by the supplied requests.
func stopMirrors(t *testing.T, client mpb.MirrorClient, reqs ...*mpb.StopRequest) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

stopReq := &mpb.StopRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
}
if _, err := client.Stop(ctx, stopReq); err != nil {
t.Fatalf("cannot stop mirror client, got err: %v", err)
for _, sr := range reqs {
if _, err := client.Stop(ctx, sr); err != nil {
t.Fatalf("cannot stop mirror client, got err: %v", err)
}
}
}

Expand All @@ -163,30 +180,15 @@ func TestMirror(t *testing.T) {
addr := mirrorAddr(t)
client, stop := mirrorClient(t, addr)
defer stop()
startMirror(t, client)
startTwoPortMirror(t, client)
time.Sleep(1 * time.Second)
stopMirror(t, client)
stopTwoPortMirror(t, client)
}

// TestMPLS is a simple test that creates an MPLS flow between two ATE ports and
// checks that there is no packet loss. It validates magna's end-to-end MPLS
// flow accounting.
func TestMPLS(t *testing.T) {
// Start a mirroring session to copy packets.
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()
mplsFlow := otgCfg.Flows().Add().SetName("MPLS_FLOW")
func addMPLSFlow(t *testing.T, otgCfg gosnappi.Config, name, srcName, dstName, srcv4, dstv4 string) {
mplsFlow := otgCfg.Flows().Add().SetName(name)
mplsFlow.Metrics().SetEnable(true)
mplsFlow.TxRx().Port().SetTxName(ateSrc.Name).SetRxName(ateDst.Name)
mplsFlow.TxRx().Port().SetTxName(srcName).SetRxName(dstName)

mplsFlow.Rate().SetChoice("pps").SetPps(1)

Expand All @@ -209,10 +211,37 @@ func TestMPLS(t *testing.T) {
mplsInner.BottomOfStack().SetChoice("value").SetValue(1)

ip4 := mplsFlow.Packet().Add().Ipv4()
ip4.Src().SetChoice("value").SetValue("100.64.1.1")
ip4.Dst().SetChoice("value").SetValue("100.64.1.2")
ip4.Src().SetChoice("value").SetValue(srcv4)
ip4.Dst().SetChoice("value").SetValue(dstv4)
ip4.Version().SetChoice("value").SetValue(4)

}

const (
// lossTolerance indicates the number of packets we are prepared to lose during
// a test. If the packets per second generation rate is low then the flow can be
// stopped before a slow packet generator creates the next packet.
lossTolerance = 1
)

// TestMPLS is a simple test that creates an MPLS flow between two ATE ports and
// checks that there is no packet loss. It validates magna's end-to-end MPLS
// flow accounting.
func TestMPLS(t *testing.T) {
// Start a mirroring session to copy packets.
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()
addMPLSFlow(t, otgCfg, "MPLS_FLOW", ateSrc.Name, ateDst.Name, "100.64.1.1", "100.64.1.2")

otg.PushConfig(t, otgCfg)

t.Logf("Starting MPLS traffic...")
Expand All @@ -226,7 +255,209 @@ func TestMPLS(t *testing.T) {
time.Sleep(1 * time.Second)
metrics := gnmi.Get(t, otg, gnmi.OTG().Flow("MPLS_FLOW").State())
got, want := metrics.GetCounters().GetInPkts(), metrics.GetCounters().GetOutPkts()
if lossPackets := want - got; lossPackets != 0 {
if lossPackets := want - got; lossPackets > lossTolerance {
t.Fatalf("did not get expected number of packets, got: %d, want: %d", got, want)
}
}

// checkFlow validates that OTG flow name passes the specified check function.
func checkFlow(t *testing.T, otg *otg.OTG, name string, testFn func(got, want uint64) error) {
t.Helper()
metrics := gnmi.Get(t, otg, gnmi.OTG().Flow(name).State())
got, want := metrics.GetCounters().GetInPkts(), metrics.GetCounters().GetOutPkts()
t.Logf("%s: recv: %d, sent: %d packets", name, got, want)
if err := testFn(got, want); err != nil {
t.Fatalf("%s: did not get expected number of packets, %v", name, err)
}
}

// toleranceFn is a check function that ensures that the packets lost on the specified flow
// are less than the lossTolerance constant.
func toleranceFn(got, want uint64) error {
if lossPackets := want - got; lossPackets > lossTolerance {
return fmt.Errorf("got: %d, want: >= %d-%d", got, want, lossTolerance)
}
return nil
}

// TestMPLSFlowsTwoPorts validates that multiple MPLS flows work with magna.
func TestMPLSFlowsTwoPorts(t *testing.T) {
tests := []struct {
desc string
inFlowFn func(*testing.T, gosnappi.Config)
inCheckFn func(*testing.T, *otg.OTG)
}{{
desc: "two flows - same source port",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
addMPLSFlow(t, cfg, "FLOW_ONE", ateSrc.Name, ateDst.Name, "100.64.1.1", "100.64.1.2")
addMPLSFlow(t, cfg, "FLOW_TWO", ateSrc.Name, ateDst.Name, "100.64.2.1", "100.64.2.2")
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
for _, f := range []string{"FLOW_ONE", "FLOW_TWO"} {
checkFlow(t, otgc, f, toleranceFn)
}
},
}, {
desc: "failure - two flows, one that is not mirrored",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
addMPLSFlow(t, cfg, "A->B", ateSrc.Name, ateDst.Name, "100.64.1.1", "100.64.1.2")
addMPLSFlow(t, cfg, "B->A", ateDst.Name, ateSrc.Name, "100.64.2.1", "100.64.2.2")
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
checkFlow(t, otgc, "A->B", toleranceFn)
checkFlow(t, otgc, "B->A", func(got, want uint64) error {
if got != 0 {
return fmt.Errorf("got: %d packets, want: 0", got)
}
return nil
})
},
}, {
desc: "ten flows",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
for i := 0; i < 10; i++ {
addMPLSFlow(t, cfg, fmt.Sprintf("flow%d", i), ateSrc.Name, ateDst.Name, fmt.Sprintf("100.64.%d.1", i), fmt.Sprintf("100.64.%d.2", i))
}
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
for i := 0; i < 10; i++ {
checkFlow(t, otgc, fmt.Sprintf("flow%d", i), toleranceFn)
}
},
}}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()

tt.inFlowFn(t, otgCfg)

otg.PushConfig(t, otgCfg)

t.Logf("Starting MPLS traffic...")
otg.StartTraffic(t)
t.Logf("Sleeping for %s...", *sleepTime)
time.Sleep(*sleepTime)
t.Logf("Stopping MPLS traffic...")
otg.StopTraffic(t)

time.Sleep(1 * time.Second)

tt.inCheckFn(t, otg)

})
}
}

// startThreePortMirror starts two mirroring sessions:
//
// - the first copies traffic from port1 -> port2, so any packet received on port1 is received by the device connected to port2.
// - the second copies traffic from port3 -> port2, so any packet received on port3 is received by the device connected to port2.
//
// This results in port2 receiving all the packets that are transmitted by magna on ports 1+3.
func startThreePortMirror(t *testing.T, client mpb.MirrorClient) {
t.Helper()
mirror := ondatra.DUT(t, "mirror")
startMirrors(t, client,
&mpb.StartRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
},
&mpb.StartRequest{
From: mirror.Port(t, "port3").Name(),
To: mirror.Port(t, "port2").Name(),
},
)
}

// stopThreePortMirror stops the mirroring of traffic between port1->port2 and port3->port2.
func stopThreePortMirror(t *testing.T, client mpb.MirrorClient) {
t.Helper()
mirror := ondatra.DUT(t, "mirror")
stopMirrors(t, client,
&mpb.StopRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
},
&mpb.StopRequest{
From: mirror.Port(t, "port3").Name(),
To: mirror.Port(t, "port2").Name(),
},
)
}

func TestMPLSFlowsThreePorts(t *testing.T) {
tests := []struct {
desc string
inFlowFn func(*testing.T, gosnappi.Config)
inCheckFn func(*testing.T, *otg.OTG)
}{{
desc: "one flow each",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
addMPLSFlow(t, cfg, "port1->port2", "port1", "port2", "100.64.1.1", "100.64.1.2")
addMPLSFlow(t, cfg, "port3->port2", "port3", "port2", "100.64.2.1", "100.64.2.2")
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
for _, f := range []string{"port1->port2", "port3->port2"} {
checkFlow(t, otgc, f, toleranceFn)
}
},
}, {
desc: "ten flows on each port",
inFlowFn: func(t *testing.T, cfg gosnappi.Config) {
for i := 0; i < 10; i++ {
addMPLSFlow(t, cfg, fmt.Sprintf("port1->port2_%d", i), "port1", "port2", fmt.Sprintf("100.64.%d.1", i), fmt.Sprintf("100.64.%d.2", i))
addMPLSFlow(t, cfg, fmt.Sprintf("port3->port2_%d", i), "port3", "port2", fmt.Sprintf("100.64.%d.1", i+20), fmt.Sprintf("100.64.%d.2", i+20))
}
},
inCheckFn: func(t *testing.T, otgc *otg.OTG) {
for i := 0; i < 10; i++ {
for _, f := range []string{"port1->port2", "port3->port2"} {
checkFlow(t, otgc, fmt.Sprintf("%s_%d", f, i), toleranceFn)
}
}
},
}}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startThreePortMirror(t, client)
time.Sleep(1 * time.Second)
defer func() { stopThreePortMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()

tt.inFlowFn(t, otgCfg)

otg.PushConfig(t, otgCfg)

t.Logf("Starting MPLS traffic...")
otg.StartTraffic(t)
t.Logf("Sleeping for %s...", *sleepTime)
time.Sleep(*sleepTime)
t.Logf("Stopping MPLS traffic...")
otg.StopTraffic(t)

time.Sleep(1 * time.Second)

tt.inCheckFn(t, otg)

})
}
}
Loading
Loading