diff --git a/clib/mininet_test_base_topo.py b/clib/mininet_test_base_topo.py index fec03e0112..c257170b6e 100644 --- a/clib/mininet_test_base_topo.py +++ b/clib/mininet_test_base_topo.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os +import random import re import time import ipaddress @@ -28,6 +29,8 @@ class FaucetTopoTestBase(FaucetTestBase): NUM_HOSTS = 4 LINKS_PER_HOST = 1 + LACP_TIMEOUT = 10 + dpids = None port_maps = None n_vlans = 0 @@ -91,10 +94,10 @@ def vlan_vid(i): """VLAN VID value""" return (i+1) * 100 - def host_ping(self, src_host, dst_ip): + def host_ping(self, src_host, dst_ip, intf=None): """Default method to ping from one host to an IP address""" self.one_ipv4_ping( - src_host, dst_ip, require_host_learned=False, retries=5, timeout=1000) + src_host, dst_ip, require_host_learned=False, retries=5, timeout=1000, intf=intf) def set_host_ip(self, host, host_ip): """Default method for setting a hosts IP address""" @@ -105,7 +108,7 @@ def build_net(self, n_dps=1, n_vlans=1, vlan_options=None, dp_options=None, host_options=None, routers=None, stack_roots=None, include=None, include_optional=None, - hw_dpid=None, lacp=False): + hw_dpid=None, lacp_trunk=False): """ Use the TopologyGenerator to generate the YAML configuration and create the network Args: @@ -122,7 +125,7 @@ def build_net(self, n_dps=1, n_vlans=1, include: include_optional: hw_dpid: DPID of hardware switch - lacp: Use LACP trunk ports + lacp_trunk: Use LACP trunk ports """ if include is None: include = [] @@ -160,7 +163,7 @@ def build_net(self, n_dps=1, n_vlans=1, include_optional=include_optional, acls=self.acls(), acl_in_dp=self.acl_in_dp(), - lacp=lacp, + lacp_trunk=lacp_trunk, vlan_options=vlan_options, dp_options=dp_options, routers=routers, @@ -182,6 +185,9 @@ def start_net(self): host routes for routed hosts """ super(FaucetTopoTestBase, self).start_net() + # Create a dictionary of host information that might be used in a test later on. + # This makes it easier to retrieve certain information and consolidates it into one + # location. self.host_information = {} for host_id, host_name in self.topo.hosts_by_id.items(): host_obj = self.net.get(host_name) @@ -191,13 +197,74 @@ def start_net(self): self.host_information[host_id] = { 'host': host_obj, 'ip': ip_interface, - 'vlan': vlan + 'mac': host_obj.MAC(), + 'vlan': vlan, + 'bond': None, + 'ports': {} } - + # Add information of hosts chosen dpid, port map values + # TODO: This redoes logic from get_config() + for i, dpid in enumerate(self.dpids): + index = 1 + for host_id, links in self.host_links.items(): + if i in links: + n_links = links.count(i) + for _ in range(n_links): + port = self.port_maps[dpid]['port_%d' % index] + self.host_information[host_id]['ports'].setdefault(dpid, []) + self.host_information[host_id]['ports'][dpid].append(port) + index += 1 + # Store faucet vip interfaces self.faucet_vips = {} for vlan in range(self.n_vlans): self.faucet_vips[vlan] = ipaddress.ip_interface(self.faucet_vip(vlan)) - + # Setup the linux bonds for LACP connected hosts + self.setup_lacp_bonds() + # Add host routes to hosts for inter vlan routing + self.setup_intervlan_host_routes() + + def setup_lacp_bonds(self): + """Search through host options for lacp hosts and configure accordingly""" + if not self.host_options: + return + bond_index = 1 + for host_id, options in self.host_options.items(): + if 'lacp' in options: + host = self.host_information[host_id]['host'] + # LACP must be configured with host ports down + for dpid, ports in self.host_information[host_id]['ports'].items(): + for port in ports: + self.set_port_down(port, dpid) + orig_ip = host.IP() + lacp_switches = [self.net.switches[i] for i in self.host_links[host_id]] + bond_members = [ + pair[0].name for switch in lacp_switches for pair in host.connectionsTo(switch)] + bond_name = 'bond%u' % (bond_index) + self.host_information[host_id]['bond'] = bond_name + for bond_member in bond_members: + # Deconfigure bond members + self.quiet_commands(host, ( + 'ip link set %s down' % bond_member, + 'ip address flush dev %s' % bond_member)) + # Configure bond interface + self.quiet_commands(host, ( + ('ip link add %s address 0e:00:00:00:00:99 ' + 'type bond mode 802.3ad lacp_rate fast miimon 100 ' + 'xmit_hash_policy layer2+3') % (bond_name), + 'ip add add %s/%s dev %s' % (orig_ip, self.NETPREFIX, bond_name), + 'ip link set %s up' % bond_name)) + # Add bond members + for bond_member in bond_members: + self.quiet_commands(host, ( + 'ip link set dev %s master %s' % (bond_member, bond_name),)) + bond_index += 1 + # Return the ports to UP + for dpid, ports in self.host_information[host_id]['ports'].items(): + for port in ports: + self.set_port_up(port, dpid) + + def setup_intervlan_host_routes(self): + """Configure host routes between hosts that belong on routed VLANs""" if self.routers: for src in self.host_information: src_host = self.host_information[src]['host'] @@ -217,7 +284,7 @@ def start_net(self): def get_config(self, dpids=None, hw_dpid=None, hardware=None, ofchannel_log=None, n_vlans=1, host_links=None, host_vlans=None, stack_roots=None, include=None, include_optional=None, acls=None, acl_in_dp=None, - lacp=False, vlan_options=None, dp_options=None, + lacp_trunk=False, vlan_options=None, dp_options=None, routers=None, host_options=None): """ Args: @@ -232,7 +299,7 @@ def get_config(self, dpids=None, hw_dpid=None, hardware=None, ofchannel_log=None include: include_optional: hw_dpid: DPID of hardware switch - lacp: Use LACP trunk ports + lacp_trunk: Use LACP trunk ports vlan_options (dict): vlan_index to key, value dp options dp_options (dict): dp index to key, value dp options routers (dict): router index to list of vlan index @@ -297,7 +364,7 @@ def add_dp(i, dpid, hw_dpid, ofchannel_log, group_table, if stack_roots and i in stack_roots: dp_config['stack'] = {} - dp_config['stack']['priority'] = stack_roots[i] + dp_config['stack']['priority'] = stack_roots[i] # pytype: disable=unsupported-operands interfaces_config = {} # Generate host links @@ -336,7 +403,7 @@ def add_dp(i, dpid, hw_dpid, ofchannel_log, group_table, else: tagged_vlans = [self.vlan_name(vlan) for vlan in range(n_vlans)] interfaces_config[port].update({'tagged_vlans': tagged_vlans}) - if lacp: + if lacp_trunk: interfaces_config[port].update({ 'lacp': 1, 'lacp_active': True @@ -619,26 +686,25 @@ def validate_with_externals_down_fails(self, dp_name): self.assertTrue(asserted, 'Did not fail as expected for %s' % dp_name) def verify_intervlan_routing(self): - """Verify intervlan routing is possible""" + """Verify intervlan routing but for LAG host use bond interface""" for src in self.host_information: - src_host = self.host_information[src]['host'] - src_vlan = self.host_information[src]['vlan'] for dst in self.host_information: if dst > src: - dst_host = self.host_information[dst]['host'] - dst_vlan = self.host_information[dst]['vlan'] - dst_ip = self.host_information[dst]['ip'] - if self.is_routed_vlans(src_vlan, dst_vlan): - src_faucet_vip = self.faucet_vips[src_vlan] - dst_faucet_vip = self.faucet_vips[dst_vlan] - self.host_ping(src_host, src_faucet_vip.ip) - self.host_ping(dst_host, dst_faucet_vip.ip) - self.host_ping(src_host, dst_ip.ip) - self.assertEqual( - self._ip_neigh( - src_host, src_faucet_vip.ip, self.IPV), self.faucet_mac(src_vlan)) - elif src_vlan == dst_vlan: - self.host_ping(src_host, dst_ip.ip) + self.check_host_connectivity_by_id(src, dst) + + def check_host_connectivity_by_id(self, src_id, dst_id): + """Ping from src to dst with host_id parameters if they should be able to""" + src_host, src_ip, _, src_vlan, src_bond, _ = self.host_information[src_id].values() + dst_host, dst_ip, _, dst_vlan, dst_bond, _ = self.host_information[dst_id].values() + connectivity = src_vlan == dst_vlan or self.is_routed_vlans(src_vlan, dst_vlan) + if self.is_routed_vlans(src_vlan, dst_vlan): + src_vip = self.faucet_vips[src_vlan] + dst_vip = self.faucet_vips[dst_vlan] + self.host_ping(src_host, src_vip.ip, src_bond) # pytype: disable=attribute-error + self.host_ping(dst_host, dst_vip.ip, dst_bond) # pytype: disable=attribute-error + if connectivity: + self.host_ping(src_host, dst_ip.ip, src_bond) # pytype: disable=attribute-error + self.host_ping(dst_host, src_ip.ip, dst_bond) # pytype: disable=attribute-error def is_routed_vlans(self, vlan_a, vlan_b): """Return true if the two vlans share a router""" @@ -664,3 +730,136 @@ def bcast_dst_blocked_helper(self, port, first_host, second_host, success_re, re return True time.sleep(1) return False + + def get_expected_synced_states(self, host_id): + """Return the list of regex string for the expected sync state of a LACP LAG connection""" + synced_state_list = [] + oper_key = self.host_options[host_id]['lacp'] + lacp_ports = [ + port for ports in self.host_information[host_id]['ports'].values() for port in ports] + for port in lacp_ports: + synced_state_txt = r""" +Slave Interface: \S+ +MII Status: up +Speed: \d+ Mbps +Duplex: full +Link Failure Count: \d+ +Permanent HW addr: \S+ +Slave queue ID: 0 +Aggregator ID: \d+ +Actor Churn State: monitoring +Partner Churn State: monitoring +Actor Churned Count: 0 +Partner Churned Count: 0 +details actor lacp pdu: + system priority: 65535 + system mac address: 0e:00:00:00:00:99 + port key: \d+ + port priority: 255 + port number: \d+ + port state: 63 +details partner lacp pdu: + system priority: 65535 + system mac address: 0e:00:00:00:00:01 + oper key: %d + port priority: 255 + port number: %d + port state: 62 +""".strip() % (oper_key, port) + synced_state_list.append(synced_state_txt) + return synced_state_list + + def prom_lacp_up_ports(self, dpid): + """Get the number of up LAG ports according to Prometheus for a dpid""" + lacp_up_ports = 0 + for host_id, options in self.host_options.items(): + # Find LACP hosts + for key in options.keys(): + if key == 'lacp': + # Is LACP host + host_information = self.host_information[host_id] + if dpid in host_information['ports']: + # LACP host has links to dpid + lacp_ports = host_information['ports'][dpid] + for port in lacp_ports: + # Obtain up LACP ports for that dpid + port_labels = self.port_labels(port) + lacp_state = self.scrape_prometheus_var( + 'port_lacp_state', port_labels, default=0, dpid=dpid) + lacp_up_ports += 1 if lacp_state == 3 else 0 + return lacp_up_ports + + def verify_num_lag_up_ports(self, expected_up_ports, dpid): + """Checks to see if Prometheus has the expected number of up LAG ports on the specified DP""" + for _ in range(self.LACP_TIMEOUT*10): + if self.prom_lacp_up_ports(dpid) == expected_up_ports: + return + time.sleep(1) + self.assertEqual(self.prom_lacp_up_ports(dpid), expected_up_ports) + + def require_linux_bond_up(self, host_id): + """Checks to see if the host has properly formed into a bonded state""" + synced_state_list = self.get_expected_synced_states(host_id) + host = self.host_information[host_id]['host'] + bond_name = self.host_information[host_id]['bond'] + for _ in range(self.LACP_TIMEOUT*2): + result = host.cmd('cat /proc/net/bonding/%s|sed "s/[ \t]*$//g"' % bond_name) + result = '\n'.join([line.rstrip() for line in result.splitlines()]) + with open(os.path.join(self.tmpdir, 'bonding-state.txt'), 'w') as state_file: + state_file.write(result) + matched_all = True + for state_txt in synced_state_list: + if not re.search(state_txt, result): + matched_all = False + break + if matched_all: + return + time.sleep(1) + synced_state_txt = r"""""" + for state_txt in synced_state_list: + synced_state_txt += state_txt + "\n\n" + synced_state_txt.strip() + self.assertFalse( + re.search(synced_state_txt, result), + msg='LACP did not synchronize: %s\n\nexpected:\n\n%s' % (result, synced_state_txt)) + + def verify_lag_connectivity(self, host_id): + """Verify LAG connectivity""" + lacp_ports = self.host_information[host_id]['ports'] + # All ports down + for dpid, ports in lacp_ports.items(): + for port in ports: + self.set_port_down(port, dpid) + self.verify_num_lag_up_ports(0, dpid) + # Pick a port to set up + up_dpid = random.choice(list(lacp_ports.keys())) + up_port = random.choice(lacp_ports[up_dpid]) + self.set_port_up(up_port, up_dpid) + self.verify_num_lag_up_ports(1, up_dpid) + # Ensure connectivity with one port + self.verify_lag_host_connectivity() + # Set the other ports to UP + for dpid, ports in lacp_ports.items(): + for port in ports: + self.set_port_up(port, dpid) + self.verify_num_lag_up_ports(len(ports), dpid) + # Ensure connectivity with all ports + self.require_linux_bond_up(host_id) + self.verify_lag_host_connectivity() + # Tear down first port + self.set_port_down(up_port, up_dpid) + self.verify_num_lag_up_ports(len(lacp_ports[up_dpid])-1, up_dpid) + # Ensure connectivity with new ports only + self.verify_lag_host_connectivity() + + def verify_lag_host_connectivity(self): + """Verify LAG hosts can connect to any other host using the interface""" + # Find all LACP hosts + for lacp_id, host_options in self.host_options.items(): + if 'lacp' in host_options: + # Found LACP host + for dst_id in self.host_information: + if lacp_id == dst_id: + continue + # Test connectivity to any other host (might be another LAG host) + self.check_host_connectivity_by_id(lacp_id, dst_id) diff --git a/faucet/dp.py b/faucet/dp.py index 388549abed..4ed344dcd6 100644 --- a/faucet/dp.py +++ b/faucet/dp.py @@ -691,7 +691,7 @@ def lacp_ports(self): def lacp_up_ports(self): """Return ports that have LACP up.""" - return tuple([port for port in self.lacp_ports() if port.dyn_lacp_up]) + return tuple([port for port in self.lacp_ports() if port.is_actor_up()]) def lags(self): """Return dict of LAGs mapped to member ports.""" @@ -741,20 +741,6 @@ def add_port(self, port): if port.lacp and port.lacp_active: self.lacp_active_ports.append(port) - def lacp_forwarding(self, port): - """Return 1 if should signal forwarding on a LACP bundle on this DP.""" - # TODO: just handle stacks with multiple roots - add further useful combinations. - if port.loop_protect_external: - if self.stack_root_name and not self.is_stack_root(): - return 0 - return 1 - - def lacp_collect_and_distribute(self, port): - """Return 1 if LACP should advertise collect and distribute on this port.""" - if port.lacp_collect_and_distribute: - return 1 - return self.lacp_forwarding(port) - def lldp_beacon_send_ports(self, now): """Return list of ports to send LLDP packets; stacked ports always send LLDP.""" send_ports = [] diff --git a/faucet/port.py b/faucet/port.py index a26e4507a8..ef5d58da20 100644 --- a/faucet/port.py +++ b/faucet/port.py @@ -27,10 +27,34 @@ STACK_STATE_GONE = 4 STACK_STATE_NONE = -1 -LACP_STATE_NONE = 0 -LACP_STATE_INIT = 1 -LACP_STATE_UP = 3 -LACP_STATE_NOACT = 5 +# LACP not configured +LACP_STATE_NONE = -1 + +# Initial state, no packets received yet +LACP_ACTOR_INIT = 1 +# LACP connection is up and receiving packets +LACP_ACTOR_UP = 3 +# LACP is down +LACP_ACTOR_NOACT = 5 +LACP_ACTOR_DISPLAY_DICT = { + LACP_STATE_NONE: 'NONE', + LACP_ACTOR_INIT: 'INITIALIZING', + LACP_ACTOR_UP: 'UP', + LACP_ACTOR_NOACT: 'NO_ACTOR' +} + +# Port is not a LACP port on the nominated DP +LACP_PORT_UNSELECTED = 1 +# Port is a LACP port on the nominated DP, will send/receive +LACP_PORT_SELECTED = 2 +# Other cases: receive-only, etc. +LACP_PORT_STANDBY = 3 +LACP_PORT_DISPLAY_DICT = { + LACP_STATE_NONE: 'NONE', + LACP_PORT_UNSELECTED: 'UNSELECTED', + LACP_PORT_SELECTED: 'SELECTED', + LACP_PORT_STANDBY: 'STANDBY' +} class Port(Conf): """Stores state for ports, including the configuration.""" @@ -209,6 +233,8 @@ def __init__(self, _id, dp_id, conf=None): self.dyn_phys_up = False self.dyn_update_time = None self.dyn_stack_current_state = STACK_STATE_NONE + self.dyn_lacp_port_selected = LACP_STATE_NONE + self.dyn_lacp_actor_state = LACP_STATE_NONE self.dyn_stack_probe_info = {} self.tagged_vlans = [] @@ -392,24 +418,128 @@ def lldp_beacon_enabled(self): """Return True if LLDP beacon enabled on this port.""" return self.lldp_beacon and self.lldp_beacon.get('enable', False) - def lacp_update(self, lacp_up, now=None, lacp_pkt=None): - self.dyn_lacp_up = 1 if lacp_up else 0 - self.dyn_lacp_updated_time = now - self.dyn_last_lacp_pkt = lacp_pkt - - def lacp_state(self): - if not self.lacp: - return LACP_STATE_NONE - if not self.dyn_last_lacp_pkt: - return LACP_STATE_INIT - return LACP_STATE_UP if self.dyn_lacp_up else LACP_STATE_NOACT - def mirror_actions(self): """Return OF actions to mirror this port.""" if self.mirror is not None: return [valve_of.output_port(mirror_port) for mirror_port in self.mirror] return [] + def non_stack_forwarding(self): + """Returns True if port is not-stacking and, and able to forward packets.""" + if self.stack: + return False + if not self.dyn_phys_up: + return False + if self.lacp and not self.dyn_lacp_up: + return False + return True + + def lacp_update(self, lacp_up, now=None, lacp_pkt=None): + """ + Update the LACP state + Args: + lacp_up (bool): The intended LACP/port state + now: Current time + lacp_pkt: Received LACP packet + Returns: + dyn_lacp_actor_state, dyn_lacp_current_state + """ + self.dyn_lacp_up = 1 if lacp_up else 0 + self.dyn_lacp_updated_time = now + self.dyn_last_lacp_pkt = lacp_pkt + if not lacp_pkt: + # Intialize states + self.actor_init() + else: + # Packets received from actor + if lacp_up: + # Receiving packets & LACP is UP + self.actor_up() + else: + # Receiving packets but LACP is DOWN + self.actor_noact() + return self.actor_state() + + def get_lacp_flags(self): + """ + Get the LACP flags for the state the port is in + Return sync, collecting, distributing flag values + """ + if self.lacp_collect_and_distribute: + return 1, 1, 1 + if self.is_port_standby(): + return 1, 0, 0 + if self.is_port_selected(): + return 1, 1, 1 + return 0, 0, 0 + + # LACP ACTOR STATES: + def is_actor_up(self): + """Return true if the LACP actor state is UP""" + return self.dyn_lacp_actor_state == LACP_ACTOR_UP + + def is_actor_noact(self): + """Return true if the LACP actor state is NOACT""" + return self.dyn_lacp_actor_state == LACP_ACTOR_NOACT + + def is_actor_init(self): + """Return true if the LACP actor state is INIT""" + return self.dyn_lacp_actor_state == LACP_ACTOR_INIT + + def actor_state(self): + """Return the current LACP actor state""" + return self.dyn_lacp_actor_state + + def actor_init(self): + """Set the LACP actor state to INIT""" + self.dyn_lacp_actor_state = LACP_ACTOR_INIT + + def actor_up(self): + """Set the LACP actor state to UP""" + self.dyn_lacp_actor_state = LACP_ACTOR_UP + + def actor_noact(self): + """Set the LACP actor state to NOACT""" + self.dyn_lacp_actor_state = LACP_ACTOR_NOACT + + def actor_state_name(self, state): + """Return the string of the actor state""" + return LACP_ACTOR_DISPLAY_DICT[state] + + # LACP PORT ROLES: + def is_port_selected(self): + """Return true if the lacp is a SELECTED port""" + return self.dyn_lacp_port_selected == LACP_PORT_SELECTED + + def is_port_unselected(self): + """Return true if the lacp is an UNSELECTED port""" + return self.dyn_lacp_port_selected == LACP_PORT_UNSELECTED + + def is_port_standby(self): + """Return true if the lacp is a STANDBY port""" + return self.dyn_lacp_port_selected == LACP_PORT_STANDBY + + def lacp_port_state(self): + """Return the current LACP port state""" + return self.dyn_lacp_port_selected + + def select_port(self): + """SELECT the current LACP port""" + self.dyn_lacp_port_selected = LACP_PORT_SELECTED + + def deselect_port(self): + """UNSELECT the current LACP port""" + self.dyn_lacp_port_selected = LACP_PORT_UNSELECTED + + def standby_port(self): + """Set the LACP port to STANDBY""" + self.dyn_lacp_port_selected = LACP_PORT_STANDBY + + def port_role_name(self, state): + """Return the LACP port role state name""" + return LACP_PORT_DISPLAY_DICT[state] + + # STACK PORT ROLES: def is_stack_admin_down(self): """Return True if port is in ADMIN_DOWN state.""" return self.dyn_stack_current_state == STACK_STATE_ADMIN_DOWN @@ -453,13 +583,3 @@ def stack_up(self): def stack_gone(self): """Change the current stack state to GONE.""" self.dyn_stack_current_state = STACK_STATE_GONE - - def non_stack_forwarding(self): - """Returns True if port is not-stacking and, and able to forward packets.""" - if self.stack: - return False - if not self.dyn_phys_up: - return False - if self.lacp and not self.dyn_lacp_up: - return False - return True diff --git a/faucet/valve.py b/faucet/valve.py index b66b9c572c..9b3fae6612 100644 --- a/faucet/valve.py +++ b/faucet/valve.py @@ -34,7 +34,7 @@ from faucet import valve_pipeline from faucet.vlan import NullVLAN, OFVLAN -from faucet.port import LACP_STATE_INIT, LACP_STATE_UP, LACP_STATE_NOACT + class ValveLogger: """Logger for a Valve that adds DP ID.""" @@ -491,14 +491,14 @@ def _decode_port_status(reason): if new_port_status: ofmsgs = self.port_add(port_no) else: - ofmsgs = self.port_delete(port_no, keep_cache=True) + ofmsgs = self.port_delete(port_no, keep_cache=True, other_valves=_other_valves) else: self.logger.info('status did not change: %s' % state_description) if new_port_status: if blocked_down_state: self.logger.info( '%s state down or blocked despite status up, setting to status down' % port) - ofmsgs = self.port_delete(port_no, keep_cache=True) + ofmsgs = self.port_delete(port_no, keep_cache=True, other_valves=_other_valves) if not live_state: self.logger.info( '%s state OFPPS_LIVE reset, ignoring in expectation of port down' % port) @@ -558,7 +558,7 @@ def fast_advertise(self, now, _other_valves): ofmsgs = [] for port in self.dp.lacp_active_ports: if port.running(): - ofmsgs.extend(self._lacp_actions(port.dyn_last_lacp_pkt, port)) + ofmsgs.extend(self._lacp_actions(port.dyn_last_lacp_pkt, port, _other_valves)) ports = self.dp.lldp_beacon_send_ports(now) ofmsgs.extend([self._send_lldp_beacon_on_port(port, now) for port in ports]) @@ -877,7 +877,7 @@ def ports_add(self, port_nums, cold_start=False, log_msg='up'): max_len=128)) if port.lacp: - ofmsgs.extend(self.lacp_down(port, cold_start=cold_start)) + ofmsgs.extend(self.lacp_update(port, False, cold_start=cold_start)) if port.lacp_active: ofmsgs.extend(self._lacp_actions(port.dyn_last_lacp_pkt, port)) @@ -918,7 +918,7 @@ def port_add(self, port_num): """ return self.ports_add([port_num]) - def ports_delete(self, port_nums, log_msg='down', keep_cache=False): + def ports_delete(self, port_nums, log_msg='down', keep_cache=False, other_valves=None): """Handle the deletion of ports. Args: @@ -948,7 +948,7 @@ def ports_delete(self, port_nums, log_msg='down', keep_cache=False): self.dp.ports[self.dp.dot1x['nfv_sw_port']] )) if port.lacp: - ofmsgs.extend(self.lacp_down(port)) + ofmsgs.extend(self.lacp_update(port, False, other_valves=other_valves)) else: ofmsgs.extend(self._port_delete_flows_state(port, keep_cache=keep_cache)) @@ -957,25 +957,146 @@ def ports_delete(self, port_nums, log_msg='down', keep_cache=False): return ofmsgs - def port_delete(self, port_num, keep_cache=False): + def port_delete(self, port_num, keep_cache=False, other_valves=None): """Return flow messages that delete port from pipeline.""" - return self.ports_delete([port_num], keep_cache=keep_cache) + return self.ports_delete([port_num], keep_cache=keep_cache, other_valves=other_valves) def _reset_lacp_status(self, port): - lacp_state = port.lacp_state() + lacp_state = port.actor_state() self._set_var('port_lacp_state', lacp_state, labels=self.dp.port_labels(port.number)) self.notify( {'LAG_CHANGE': {'port_no': port.number, 'state': lacp_state}}) - def lacp_down(self, port, cold_start=False, lacp_pkt=None): - """Return OpenFlow messages when LACP is down on a port.""" + def get_lacp_dpid_nomination(self, lacp_id, other_valves): + """ + Chooses the DP for a given LAG + The DP will be nominated by the following conditions in order: + 1) Number of LAG ports + 2) Root DP + 3) Lowest DPID + Args: + lacp_id: The LACP LAG ID + other_valves (list): list of other valves + Returns nominated_dpid, reason + """ + if not other_valves: + return None, '' + stacked_other_valves = self._stacked_valves(other_valves) + all_stacked_valves = {self}.union(stacked_other_valves) + ports = {} + root_dpid = None + for valve in all_stacked_valves: + all_lags = valve.dp.lags_up() + if lacp_id in all_lags: + ports[valve.dp.dp_id] = len(all_lags[lacp_id]) + if valve.dp.is_stack_root(): + root_dpid = valve.dp.dp_id + # Order by number of ports + port_order = sorted(ports, key=ports.get, reverse=True) + if not port_order: + return None, '' + most_ports_dpid = port_order[0] + most_ports_dpids = [dpid for dpid, num in ports.items() if num == ports[most_ports_dpid]] + if len(most_ports_dpids) > 1: + # There are several dpids that have the same number of lags + if root_dpid in most_ports_dpids: + # root_dpid is the chosen DPID + return root_dpid, 'root dp' + # Order by lowest DPID + return sorted(most_ports_dpids), 'lowest dpid' + # Most_ports_dpid is the chosen DPID + return most_ports_dpid, 'most LAG ports' + + def lacp_update_port_selection_state(self, port, other_valves=None): + """ + Update the LACP port selection state + Args: + port (Port): LACP port + other_valves (list): List of other valves + Returns: + bool: True if port state changed + """ + nominated_dpid = self.dp.dp_id + if self.dp.stack: + nominated_dpid, _ = self.get_lacp_dpid_nomination(port.lacp, other_valves) + prev_state = port.lacp_port_state() + port.select_port() if self.dp.dp_id == nominated_dpid else port.deselect_port() + new_state = port.lacp_port_state() + if new_state != prev_state: + self.logger.info('LAG %u %s %s (previous state %s)' % ( + port.lacp, port, port.port_role_name(new_state), + port.port_role_name(prev_state))) + return new_state != prev_state + + def lacp_update_actor_state(self, port, lacp_up, now=None, lacp_pkt=None): + """ + Updates a LAG actor state + Args: + port: LACP port + lacp_up (bool): Whether LACP is going UP or DOWN + now (float): Current epoch time + lacp_pkt (PacketMeta): LACP packet + Returns: + bool: True if LACP state changed + """ + prev_actor_state = port.actor_state() + new_actor_state = port.lacp_update(lacp_up, now=now, lacp_pkt=lacp_pkt) + if prev_actor_state != new_actor_state: + self.logger.info('LAG %u %s actor state %s (previous state %s)' % ( + port.lacp, port, port.actor_state_name(new_actor_state), + port.actor_state_name(prev_actor_state))) + return prev_actor_state != new_actor_state + + def lacp_update(self, port, lacp_up, now=None, + lacp_pkt=None, other_valves=None, cold_start=False): + """ + Update the port's LACP states and enables/disables packets + from the link to be processed further through the pipeline + based on the state changes + Args: + port: The port the packet is being received on + lacp_up (bool): Whether the lacp actor is up + now (float): The current time + lacp_pkt (PacketMeta): The received LACP packet + other_valves (list): List of other valves (in the stack) + cold_state (bool): Whether Faucet is being cold started or not + Returns: + ofmsgs + """ + ofmsgs = [] + updated = self.lacp_update_actor_state(port, lacp_up, now, lacp_pkt) + select_updated = self.lacp_update_port_selection_state(port, other_valves) + if updated or select_updated: + if updated: + self._reset_lacp_status(port) + if port.is_port_selected() and port.is_actor_up(): + ofmsgs = self.lacp_enable_forwarding(port) + else: + ofmsgs = self.lacp_disable_forwarding(port) + return ofmsgs + + def lacp_enable_forwarding(self, port): + """ + Enables packets from the LAG link to progress through the pipeline + also recomputes the flooding & host rules + Should only be called when a LACP port is both ACTOR_UP & SELECTED + Return OpenFlow messages when LACP is UP and SELECTED on a port + """ + vlan_table = self.dp.tables['vlan'] + ofmsgs = [] + ofmsgs.append(vlan_table.flowdel( + match=vlan_table.match(in_port=port.number), + priority=self.dp.high_priority, strict=True)) + ofmsgs.extend(self.add_vlans(port.vlans())) + return ofmsgs + + def lacp_disable_forwarding(self, port, cold_start=False): + """ + Return OpenFlow messages for initial LACP states and + when packets from the link should not be sent through + the pipeline + """ ofmsgs = [] - prev_state = port.lacp_state() - new_state = LACP_STATE_NOACT if lacp_pkt else LACP_STATE_INIT - if prev_state != new_state: - self.logger.info('LAG %u %s state %d (previous state %d)' % ( - port.lacp, port, new_state, prev_state)) - port.lacp_update(False, lacp_pkt=lacp_pkt) if not cold_start: ofmsgs.extend(self.host_manager.del_port(port)) ofmsgs.extend(self.add_vlans(port.vlans())) @@ -990,29 +1111,17 @@ def lacp_down(self, port, cold_start=False, lacp_pkt=None): eth_dst=valve_packet.SLOW_PROTOCOL_MULTICAST), priority=self.dp.highest_priority, max_len=valve_packet.LACP_SIZE)) - self._reset_lacp_status(port) return ofmsgs - def lacp_up(self, port, now, lacp_pkt): - """Return OpenFlow messages when LACP is up on a port.""" - vlan_table = self.dp.tables['vlan'] - ofmsgs = [] - prev_state = port.lacp_state() - if prev_state != LACP_STATE_UP: - self.logger.info('LAG %u %s up (previous state %s)' % ( - port.lacp, port, prev_state)) - port.lacp_update(True, now=now, lacp_pkt=lacp_pkt) - # Only enable learning if this bundle is selected for forwarding. - # E.g. non stack or root of stack. - if self.dp.lacp_forwarding(port): - ofmsgs.append(vlan_table.flowdel( - match=vlan_table.match(in_port=port.number), - priority=self.dp.high_priority, strict=True)) - ofmsgs.extend(self.add_vlans(port.vlans())) - self._reset_lacp_status(port) - return ofmsgs - - def _lacp_actions(self, lacp_pkt, port): + def _lacp_actions(self, lacp_pkt, port, other_valves=None): + """ + Constructs a LACP req-reply packet + Args: + lacp_pkt (PacketMeta): LACP packet received + port: LACP port + other_valves (list): List of other valves + Return list packetout OpenFlow msgs + """ if port.lacp_passthrough: for peer_num in port.lacp_passthrough: lacp_peer = self.dp.ports.get(peer_num, None) @@ -1023,13 +1132,12 @@ def _lacp_actions(self, lacp_pkt, port): actor_state_activity = 0 if port.lacp_active: actor_state_activity = 1 - actor_state_collecting = self.dp.lacp_collect_and_distribute(port) - actor_state_distributing = actor_state_collecting + actor_state_sync, actor_state_col, actor_state_dist = port.get_lacp_flags() if lacp_pkt: pkt = valve_packet.lacp_reqreply( self.dp.faucet_dp_mac, self.dp.faucet_dp_mac, - port.lacp, port.number, 1, actor_state_activity, - actor_state_collecting, actor_state_distributing, + port.lacp, port.number, actor_state_sync, actor_state_activity, + actor_state_col, actor_state_dist, lacp_pkt.actor_system, lacp_pkt.actor_key, lacp_pkt.actor_port, lacp_pkt.actor_system_priority, lacp_pkt.actor_port_priority, lacp_pkt.actor_state_defaulted, @@ -1044,57 +1152,49 @@ def _lacp_actions(self, lacp_pkt, port): pkt = valve_packet.lacp_reqreply( self.dp.faucet_dp_mac, self.dp.faucet_dp_mac, port.lacp, port.number, + actor_state_synchronization=actor_state_sync, actor_state_activity=actor_state_activity, - actor_state_collecting=actor_state_collecting, - actor_state_distributing=actor_state_distributing) + actor_state_collecting=actor_state_col, + actor_state_distributing=actor_state_dist) self.logger.debug('Sending LACP %s on %s activity %s' % (pkt, port, actor_state_activity)) return [valve_of.packetout(port.number, pkt.data)] - def lacp_handler(self, now, pkt_meta): - """Handle a LACP packet. - - We are a currently a passive, non-aggregateable LACP partner. - + def lacp_handler(self, now, pkt_meta, other_valves): + """ + Handle receiving an LACP packet Args: - now (float): current epoch time. - pkt_meta (PacketMeta): packet for control plane. - Returns: - dict: OpenFlow messages, if any by Valve. + now (float): current epoch time + pkt_meta (PacketMeta): packet for control plane + Returns + dict: OpenFlow messages, if any by Valve """ - # TODO: ensure config consistent between LAG ports. ofmsgs_by_valve = defaultdict(list) if (pkt_meta.eth_dst == valve_packet.SLOW_PROTOCOL_MULTICAST and pkt_meta.eth_type == valve_of.ether.ETH_TYPE_SLOW and pkt_meta.port.lacp): + # LACP packet so reparse pkt_meta.data = pkt_meta.data[:valve_packet.LACP_SIZE] pkt_meta.reparse_all() lacp_pkt = valve_packet.parse_lacp_pkt(pkt_meta.pkt) if lacp_pkt: self.logger.debug('receive LACP %s on %s' % (lacp_pkt, pkt_meta.port)) + # Respond to new LACP packet or if we haven't sent anything in a while age = None if pkt_meta.port.dyn_lacp_last_resp_time: age = now - pkt_meta.port.dyn_lacp_last_resp_time - actor_up = valve_packet.lacp_actor_up(lacp_pkt) - prev_state = pkt_meta.port.lacp_state() - new_state = LACP_STATE_UP if actor_up else LACP_STATE_NOACT lacp_pkt_change = ( pkt_meta.port.dyn_last_lacp_pkt is None or str(lacp_pkt) != str(pkt_meta.port.dyn_last_lacp_pkt)) lacp_resp_interval = pkt_meta.port.lacp_resp_interval if lacp_pkt_change or (age is not None and age > lacp_resp_interval): - ofmsgs_by_valve[self].extend(self._lacp_actions(lacp_pkt, pkt_meta.port)) + ofmsgs_by_valve[self].extend( + self._lacp_actions(lacp_pkt, pkt_meta.port, other_valves)) pkt_meta.port.dyn_lacp_last_resp_time = now - if prev_state != new_state: - self.logger.info( - 'remote LACP state change from %s to %s from %s LAG %u (%s)' % ( - prev_state, new_state, lacp_pkt.actor_system, pkt_meta.port.lacp, - pkt_meta.log())) - if actor_up: - ofmsgs_by_valve[self].extend(self.lacp_up(pkt_meta.port, now, lacp_pkt)) - else: - ofmsgs_by_valve[self].extend(self.lacp_down(pkt_meta.port, lacp_pkt=lacp_pkt)) - else: - pkt_meta.port.lacp_update(actor_up, now=now, lacp_pkt=lacp_pkt) + # Update the LACP information + actor_up = lacp_pkt.actor_state_synchronization + ofmsgs_by_valve[self].extend(self.lacp_update( + pkt_meta.port, actor_up, now=now, lacp_pkt=lacp_pkt, other_valves=other_valves)) + # Determine if LACP ports with the same ID have met different actor systems other_lag_ports = [ port for port in self.dp.ports.values() if port.lacp == pkt_meta.port.lacp and port.dyn_last_lacp_pkt] @@ -1106,10 +1206,6 @@ def lacp_handler(self, now, pkt_meta): 'LACP actor system mismatch %s: %s, %s %s' % ( pkt_meta.port, actor_system, other_lag_port, other_actor_system)) - updated_state = pkt_meta.port.lacp_state() - assert updated_state is new_state, ( - 'Updated state %d not as expected new state %d' % (updated_state, new_state)) - return ofmsgs_by_valve def _verify_stack_lldp(self, port, now, other_valves, @@ -1450,7 +1546,7 @@ def _update_port(vlan, port): def _non_vlan_rcv_packet(self, now, other_valves, pkt_meta): self._inc_var('of_non_vlan_packet_ins') if pkt_meta.port.lacp: - lacp_ofmsgs_by_valve = self.lacp_handler(now, pkt_meta) + lacp_ofmsgs_by_valve = self.lacp_handler(now, pkt_meta, other_valves) if lacp_ofmsgs_by_valve: return lacp_ofmsgs_by_valve # TODO: verify LLDP message (e.g. org-specific authenticator TLV) @@ -1591,7 +1687,8 @@ def _lacp_state_expire(self, now, _other_valves): lacp_age = now - port.dyn_lacp_updated_time if lacp_age > self.dp.lacp_timeout: self.logger.info('LAG %s %s expired (age %u)' % (lag, port, lacp_age)) - ofmsgs_by_valve[self].extend(self.lacp_down(port)) + ofmsgs_by_valve[self].extend(self.lacp_update( + port, False, now=now, other_valves=_other_valves)) return ofmsgs_by_valve def state_expire(self, now, other_valves): diff --git a/faucet/valve_flood.py b/faucet/valve_flood.py index 340db911ad..0944c7a35c 100644 --- a/faucet/valve_flood.py +++ b/faucet/valve_flood.py @@ -99,7 +99,7 @@ def _build_flood_local_rule_actions(self, vlan, exclude_unicast, in_port, # pyl exclude_all_external, exclude_restricted_bcast_arpnd): """Return a list of flood actions to flood packets from a port.""" external_ports = self.canonical_port_order(vlan.loop_protect_external_ports_up()) - exclude_ports = vlan.exclude_same_lag_member_ports(in_port) + exclude_ports = vlan.excluded_lag_ports(in_port) exclude_ports.update(vlan.exclude_native_if_dot1x()) if exclude_all_external or (in_port is not None and in_port.loop_protect_external): exclude_ports.update(set(external_ports)) diff --git a/faucet/vlan.py b/faucet/vlan.py index 2f1d3c4fcc..1a15af67fe 100644 --- a/faucet/vlan.py +++ b/faucet/vlan.py @@ -470,9 +470,10 @@ def lacp_ports(self): """Return ports that have LACP on this VLAN.""" return tuple([port for port in self.get_ports() if port.lacp]) - def lacp_up_ports(self): - """Return ports that have LACP up on this VLAN.""" - return tuple([port for port in self.lacp_ports() if port.dyn_lacp_up]) + def lacp_up_selected_ports(self): + """Return LACP ports that have been SELECTED and are UP""" + return tuple([ + port for port in self.lacp_ports() if port.is_port_selected() and port.is_actor_up()]) def lags(self): """Return dict of LAGs mapped to member ports.""" @@ -481,30 +482,29 @@ def lags(self): lags[port.lacp].append(port) return lags - def lags_up(self): - """Return dict of LAGs mapped to member ports that have LACP up.""" + def selected_up_lags(self): + """Return dict of LAGs mapped to member ports that have been selected""" lags = collections.defaultdict(list) - for port in self.lacp_up_ports(): + for port in self.lacp_up_selected_ports(): lags[port.lacp].append(port) return lags - def exclude_same_lag_member_ports(self, in_port=None): - """Ensure output on only one member of a LAG.""" + def excluded_lag_ports(self, in_port=None): + """Ensure output to SELECTED LAG ports & only one LAG member""" exclude_ports = set() lags = self.lags() if lags: - lags_up = self.lags_up() + # Need lags that have actor UP & are SELECTED + selected_ports = self.selected_up_lags() if in_port is not None and in_port.lacp: - # Don't flood from one LACP bundle member, to another. + # Don't flood to same LAG exclude_ports.update(lags[in_port.lacp]) - # Pick one up bundle member to flood to. + # Pick a bundle member to flood to for lag, ports in lags.items(): - ports_up = lags_up[lag] - if ports_up: - ports.remove(ports_up[0]) - exclude_ports.update(ports) - else: - exclude_ports.update(ports) + selected_lag = selected_ports[lag] + if selected_lag: + ports.remove(selected_lag[0]) + exclude_ports.update(ports) return exclude_ports def exclude_native_if_dot1x(self): @@ -565,7 +565,7 @@ def flood_pkt(self, packet_builder, multi_out=True, *args): (None, self.untagged_flood_ports(False))): if ports: pkt = packet_builder(vid, *args) - exclude_ports = self.exclude_same_lag_member_ports() + exclude_ports = self.excluded_lag_ports() running_port_nos = [ port.number for port in ports if port.running() and port not in exclude_ports] if running_port_nos: diff --git a/tests/integration/mininet_multidp_tests.py b/tests/integration/mininet_multidp_tests.py index 610b0c12ab..7cd7da2d65 100644 --- a/tests/integration/mininet_multidp_tests.py +++ b/tests/integration/mininet_multidp_tests.py @@ -3,6 +3,8 @@ import os import networkx +from mininet.log import error + from clib.mininet_test_base import IPV4_ETH, IPV6_ETH from clib.mininet_test_topo_generator import FaucetTopoGenerator from clib.mininet_test_base_topo import FaucetTopoTestBase @@ -17,7 +19,7 @@ def setUp(self): def set_up(self, stack=False, n_dps=1, n_tagged=0, n_untagged=0, include=None, include_optional=None, switch_to_switch_links=1, hw_dpid=None, stack_ring=False, - lacp=False, use_external=False, + lacp_trunk=False, use_external=False, vlan_options=None, dp_options=None, routers=None): """Set up a network with the given parameters""" super(FaucetMultiDPTest, self).setUp() @@ -47,7 +49,7 @@ def set_up(self, stack=False, n_dps=1, n_tagged=0, n_untagged=0, stack_roots=stack_roots, vlan_options=vlan_options, dp_options=dp_options, routers=routers, include=include, include_optional=include_optional, hw_dpid=hw_dpid, - lacp=lacp, host_options=host_options) + lacp_trunk=lacp_trunk, host_options=host_options) self.start_net() @@ -120,7 +122,7 @@ def setUp(self): # pylint: disable=invalid-name n_untagged=self.NUM_HOSTS, switch_to_switch_links=2, hw_dpid=self.hw_dpid, - lacp=True) + lacp_trunk=True) def lacp_ports(self): """Return LACP ports""" @@ -139,19 +141,22 @@ def wait_for_lacp_state(self, port_no, wanted_state, dpid, dp_name, timeout=30): self.fail('wanted LACP state for %s to be %u' % (labels, wanted_state)) def wait_for_lacp_port_init(self, port_no, dpid, dp_name): + """Wait for LACP state INIT""" self.wait_for_lacp_state(port_no, 1, dpid, dp_name) def wait_for_lacp_port_up(self, port_no, dpid, dp_name): - + """Wait for LACP state UP""" self.wait_for_lacp_state(port_no, 3, dpid, dp_name) def wait_for_lacp_port_noact(self, port_no, dpid, dp_name): + """Wait for LACP state NOACT""" self.wait_for_lacp_state(port_no, 5, dpid, dp_name) # We sort non_host_links by port because FAUCET sorts its ports # and only floods out of the first active LACP port in that list def wait_for_all_lacp_up(self): + """Wait for all LACP ports to be up""" (first_lacp_port, second_lacp_port, remote_first_lacp_port, _) = self.lacp_ports() self.wait_for_lacp_port_up(first_lacp_port, self.dpid, self.DP_NAME) self.wait_for_lacp_port_up(second_lacp_port, self.dpid, self.DP_NAME) @@ -687,6 +692,7 @@ class FaucetTunnelSameDpTest(FaucetMultiDPTest): SWITCH_TO_SWITCH_LINKS = 2 def acls(self): + """Return ACL config""" return { 1: [ {'rule': { @@ -706,8 +712,8 @@ def acls(self): ] } - # DP-to-acl_in port mapping. def acl_in_dp(self): + """DP to acl port mapping""" port_1 = self.port_map['port_1'] return { 0: { @@ -734,6 +740,7 @@ class FaucetTunnelTest(FaucetMultiDPTest): SWITCH_TO_SWITCH_LINKS = 2 def acls(self): + """Return config ACL options""" dpid2 = self.dpids[1] port2_1 = self.port_maps[dpid2]['port_1'] return { @@ -755,8 +762,8 @@ def acls(self): ] } - # DP-to-acl_in port mapping. def acl_in_dp(self): + """DP-to-acl port mapping""" port_1 = self.port_map['port_1'] return { 0: { @@ -766,6 +773,7 @@ def acl_in_dp(self): } def setUp(self): # pylint: disable=invalid-name + """Start the network""" super(FaucetTunnelTest, self).set_up( stack=True, n_dps=self.NUM_DPS, @@ -800,9 +808,16 @@ class FaucetSingleUntaggedIPV4RoutingWithStackingTest(FaucetTopoTestBase): SOFTWARE_ONLY = True def setUp(self): + """Disabling allows for each test case to start the test""" pass def set_up(self, n_dps, host_links=None, host_vlans=None): + """ + Args: + n_dps: Number of DPs + host_links: How to connect each host to the DPs + host_vlans: The VLAN each host is on + """ super(FaucetSingleUntaggedIPV4RoutingWithStackingTest, self).setUp() n_vlans = 3 routed_vlans = 2 @@ -828,6 +843,7 @@ def set_up(self, n_dps, host_links=None, host_vlans=None): @staticmethod def get_dp_options(): + """Return DP config options""" return { 'drop_spoofed_faucet_mac': False, 'arp_neighbor_timeout': 2, @@ -883,6 +899,7 @@ class FaucetSingleUntaggedIPV6RoutingWithStackingTest(FaucetSingleUntaggedIPV4Ro ETH_TYPE = IPV6_ETH def get_dp_options(self): + """Return DP config options""" return { 'drop_spoofed_faucet_mac': False, 'nd_neighbor_timeout': 2, @@ -890,10 +907,12 @@ def get_dp_options(self): 'proactive_learn_v6': True } - def host_ping(self, src_host, dst_ip): + def host_ping(self, src_host, dst_ip, intf=None): + """Override to ping ipv6 addresses""" self.one_ipv6_ping(src_host, dst_ip, require_host_learned=False) def set_host_ip(self, host, host_ip): + """Override to setup host ipv6 ip address""" self.add_host_ipv6_address(host, host_ip) def faucet_vip(self, i): @@ -917,9 +936,11 @@ class FaucetSingleUntaggedVlanStackFloodTest(FaucetTopoTestBase): SOFTWARE_ONLY = True def setUp(self): + """Disabling allows for each test case to start the test""" pass def set_up(self): + """Start the network""" super(FaucetSingleUntaggedVlanStackFloodTest, self).setUp() stack_roots = {0: 1} dp_links = FaucetTopoGenerator.dp_links_networkx_graph(networkx.path_graph(self.NUM_DPS)) @@ -943,6 +964,7 @@ def set_up(self): @staticmethod def get_dp_options(): + """Return DP config options""" return { 'drop_spoofed_faucet_mac': False, 'arp_neighbor_timeout': 2, @@ -1016,3 +1038,252 @@ def test_hosts_connect_over_stack_transit(self): """Test to ensure that hosts can be connected over stack transit switches""" self.verify_stack_up() self.verify_intervlan_routing() + + +class FaucetSingleLAGTest(FaucetTopoTestBase): + """Test LACP LAG on Faucet stack topologies with a distributed LAG bundle""" + + NUM_DPS = 2 + NUM_HOSTS = 5 + NUM_VLANS = 2 + SOFTWARE_ONLY = True + + LACP_HOST = 2 + + @staticmethod + def get_dp_options(): + """Return DP config options""" + return { + 'drop_spoofed_faucet_mac': False, + 'arp_neighbor_timeout': 2, + 'max_resolve_backoff_time': 2, + 'proactive_learn_v4': True, + 'lacp_timeout': 10 + } + + def setUp(self): + """Disabling allows for each test case to start the test""" + pass + + def set_up(self, lacp_host_links, host_vlans=None): + """ + Args: + lacp_host_links: List of dpid indices the LACP host will be connected to + host_vlans: Default generate with one host on each VLAN, on each DP + plus one LAG host the same VLAN as hosts + """ + super(FaucetSingleLAGTest, self).setUp() + stack_roots = {0: 1} + dp_links = FaucetTopoGenerator.dp_links_networkx_graph(networkx.path_graph(self.NUM_DPS)) + host_links = {0: [0], 1: [0], self.LACP_HOST: lacp_host_links, 3: [1], 4: [1]} + if host_vlans is None: + host_vlans = {0: 0, 1: 1, 2: 1, 3: 0, 4: 1} + vlan_options = {} + for v in range(self.NUM_VLANS): + vlan_options[v] = { + 'faucet_mac': self.faucet_mac(v), + 'faucet_vips': [self.faucet_vip(v)], + 'targeted_gw_resolution': False + } + dp_options = {dp: self.get_dp_options() for dp in range(self.NUM_DPS)} + routers = {0: [v for v in range(self.NUM_VLANS)]} + host_options = {self.LACP_HOST: {'lacp': 1}} + self.build_net( + n_dps=self.NUM_DPS, n_vlans=self.NUM_VLANS, dp_links=dp_links, + host_links=host_links, host_vlans=host_vlans, + stack_roots=stack_roots, vlan_options=vlan_options, + dp_options=dp_options, host_options=host_options, routers=routers) + self.start_net() + + def test_lacp_lag(self): + """Test LACP LAG, where LAG bundle is connected to the same DP""" + lacp_host_links = [0, 0] + self.set_up(lacp_host_links) + self.verify_stack_up() + self.verify_lag_connectivity(self.LACP_HOST) + + def test_mclag_vip_connectivity(self): + """Test LACP MCLAG, where LAG bundle is connected to different DPs""" + lacp_host_links = [0, 1] + self.set_up(lacp_host_links) + self.verify_stack_up() + self.verify_lag_connectivity(self.LACP_HOST) + + +class FaucetSingleLAGOnUniqueVLANTest(FaucetSingleLAGTest): + """Test LACP LAG on Faucet stack topologies with a distributed LAG bundle on a unique VLAN""" + + NUM_VLANS = 3 + + def set_up(self, lacp_host_links, host_vlans=None): + """ + Generate tests but with the LAG host on a different VLAN + Args: + lacp_host_links: List of dpid indices the LACP host will be connected to + """ + host_vlans = {0: 0, 1: 1, self.LACP_HOST: 2, 3: 0, 4: 1} + super(FaucetSingleLAGOnUniqueVLANTest, self).set_up(lacp_host_links, host_vlans) + + +class FaucetSingleMCLAGComplexTest(FaucetTopoTestBase): + """Line topology on 3 nodes, MCLAG host with 2 connections to 2 different switches""" + + NUM_DPS = 3 + NUM_HOSTS = 4 + NUM_VLANS = 1 + SOFTWARE_ONLY = True + + LACP_HOST = 3 + + @staticmethod + def get_dp_options(): + return { + 'drop_spoofed_faucet_mac': False, + 'arp_neighbor_timeout': 2, + 'max_resolve_backoff_time': 2, + 'proactive_learn_v4': True, + 'lacp_timeout': 10 + } + + def setUp(self): + pass + + def set_up(self): + super(FaucetSingleMCLAGComplexTest, self).setUp() + stack_roots = {0: 1} + dp_links = FaucetTopoGenerator.dp_links_networkx_graph(networkx.path_graph(self.NUM_DPS)) + # LACP host doubly connected to sw0 & sw1 + host_links = {0: [0], 1: [1], 2: [2], 3: [0, 0, 2, 2]} + host_vlans = {host_id: 0 for host_id in range(self.NUM_HOSTS)} + dp_options = {dp: self.get_dp_options() for dp in range(self.NUM_DPS)} + host_options = {self.LACP_HOST: {'lacp': 1}} + self.build_net( + n_dps=self.NUM_DPS, n_vlans=self.NUM_VLANS, dp_links=dp_links, + host_links=host_links, host_vlans=host_vlans, stack_roots=stack_roots, + dp_options=dp_options, host_options=host_options) + self.start_net() + + def test_lag_connectivity(self): + """Test whether the LAG host can connect to any other host""" + self.set_up() + self.verify_stack_up() + self.require_linux_bond_up(self.LACP_HOST) + self.verify_lag_host_connectivity() + + def test_all_lacp_links(self): + """ + All of the LAG links should work, test by using the xmit_hash_policy + with different IP addresses to change the link used by the packet + """ + self.set_up() + self.verify_stack_up() + self.require_linux_bond_up(self.LACP_HOST) + lacp_host = self.host_information[self.LACP_HOST]['host'] + lacp_switches = {self.net.switches[i] for i in self.host_links[self.LACP_HOST]} + lacp_intfs = sorted({ + pair[0].name for switch in lacp_switches for pair in lacp_host.connectionsTo(switch)}) + dst_host_id = 1 + dst_host = self.host_information[dst_host_id]['host'] + tcpdump_filter = ( + 'ip and ether src 0e:00:00:00:00:99 ' + 'and src net %s and dst net %s' % (lacp_host.IP(), dst_host.IP())) + # Loop until all links have been used to prove that they can be used + link_used = [False for _ in range(len(lacp_intfs))] + max_iter = len(lacp_intfs) * 2 + iterations = 0 + while link_used.count(False) > 2 and iterations <= max_iter: + no_packets = True + for i, intf in enumerate(lacp_intfs): + funcs = [] + funcs.append(lambda: lacp_host.cmd('ping -c5 %s' % dst_host.IP())) + tcpdump_txt = self.tcpdump_helper( + lacp_host, tcpdump_filter, intf_name=intf, funcs=funcs) + no_packets = self.tcpdump_rx_packets(tcpdump_txt, packets=0) + if not no_packets: + # Packets detected on link so can stop testing and + # goto a new IP value for the remaining links + link_used[i] = True + error('%s via %s\n' % (dst_host.IP(), intf)) + break + # If no packets have been detected on any port then something + # has gone terribly wrong + self.assertFalse( + no_packets, 'Ping packets to host IP %s could not be found' % dst_host.IP()) + # Increment the host IP address to change the LACP hash value, + # potentially changing the link used + self.increment_host_ip(dst_host_id) + tcpdump_filter = ( + 'ip and ether src 0e:00:00:00:00:99 ' + 'and src net %s and dst net %s' % (lacp_host.IP(), dst_host.IP())) + iterations += 1 + not_used = [list(lacp_intfs)[i] for i, value in enumerate(link_used) if not value] + expected_links = [True, True, False, False] + self.assertEqual(link_used, expected_links, 'Links %s not used' % not_used) + + def increment_host_ip(self, host_id): + """Increases the host ip address""" + host = self.host_information[host_id]['host'] + self.host_information[host_id]['ip'] += 3 + self.set_host_ip(host, self.host_information[host_id]['ip']) + + def test_lacp_port_change(self): + """ + Test that communication to a host on a LAG is possible + after the original selected link goes DOWN + """ + self.set_up() + self.verify_stack_up() + self.require_linux_bond_up(self.LACP_HOST) + self.verify_lag_host_connectivity() + root_dpid = self.dpids[0] + lacp_ports = self.host_information[self.LACP_HOST]['ports'] + for port in lacp_ports[root_dpid]: + self.set_port_down(port, root_dpid) + self.verify_num_lag_up_ports(0, root_dpid) + self.verify_lag_host_connectivity() + + def test_broadcast_loop(self): + """ + LACP packets should be hashed using xmit_hash_policy layer2+3 + This means that IP & MAC & Packet type is used for hashing/choosing + the LAG link + When LAG host sends broadcast, the packet should only be visible on + one link (the sending link), if the broadcast packet is detected + on the other links, then the packet was returned to it (via the + Faucet network) + """ + self.set_up() + self.verify_stack_up() + self.require_linux_bond_up(self.LACP_HOST) + lacp_host = self.host_information[self.LACP_HOST]['host'] + lacp_switches = {self.net.switches[i] for i in self.host_links[self.LACP_HOST]} + lacp_intfs = { + pair[0].name for switch in lacp_switches for pair in lacp_host.connectionsTo(switch)} + dst_host = self.host_information[1]['host'] + # Detect initial broadcast ARP + tcpdump_filter = ('arp and ether src 0e:00:00:00:00:99 ' + 'and ether dst ff:ff:ff:ff:ff:ff') + # Count the number of links that contained the broadcast ARP packet + except_count = 0 + for intf in lacp_intfs: + funcs = [] + # Delete all ARP records of the lacp host + for host_id in self.host_information: + host = self.host_information[host_id]['host'] + funcs.append(lambda: host.cmd('arp -d %s' % lacp_host.IP())) + funcs.append(lambda: host.cmd('arp -d %s' % dst_host.IP())) + funcs.append(lambda: lacp_host.cmd('arp -d %s' % host.IP())) + # Ping to cause broadcast ARP request + funcs.append(lambda: lacp_host.cmd('ping -c5 %s' % dst_host.IP())) + # Start tcpdump looking for broadcast ARP packets + tcpdump_txt = self.tcpdump_helper( + lacp_host, tcpdump_filter, intf_name=intf, funcs=funcs) + try: + self.verify_no_packets(tcpdump_txt) + except AssertionError: + error('Broadcast detected on %s\n' % intf) + except_count += 1 + # Only the source LACP link should detect the packet + self.assertEqual( + except_count, 1, + 'Number of links detecting the broadcast ARP %s (!= 1)' % except_count) diff --git a/tests/integration/mininet_tests.py b/tests/integration/mininet_tests.py index 16abad5cb6..7ccd073410 100644 --- a/tests/integration/mininet_tests.py +++ b/tests/integration/mininet_tests.py @@ -4018,19 +4018,19 @@ def require_linux_bond_up(): require_lag_up_ports(1) # We have connectivity with only one port. self.one_ipv4_ping( - first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond) + first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond, retries=5) for port in lag_ports: self.set_port_up(self.port_map['port_%u' % port]) # We have connectivity with two ports. require_lag_up_ports(2) require_linux_bond_up() self.one_ipv4_ping( - first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond) + first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond, retries=5) # We have connectivity if that random port goes down. self.set_port_down(self.port_map['port_%u' % up_port]) require_lag_up_ports(1) self.one_ipv4_ping( - first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond) + first_host, self.FAUCET_VIPV4.ip, require_host_learned=False, intf=bond, retries=5) for port in lag_ports: self.set_port_up(self.port_map['port_%u' % port]) diff --git a/tests/unit/faucet/test_port.py b/tests/unit/faucet/test_port.py index d0b92d0e81..5b1505847f 100755 --- a/tests/unit/faucet/test_port.py +++ b/tests/unit/faucet/test_port.py @@ -3,15 +3,19 @@ import unittest from faucet.port import Port +from faucet.port import ( + LACP_STATE_NONE, LACP_ACTOR_INIT, LACP_ACTOR_UP, LACP_ACTOR_NOACT, + LACP_PORT_UNSELECTED, LACP_PORT_SELECTED, LACP_PORT_STANDBY) -class MockVLAN(object): # pylint: disable=too-few-public-methods + +class MockVLAN(object): # pylint: disable=too-few-public-methods """Mock class for VLAN so we can inject into Port""" def __init__(self, name): self.name = name -class FaucetPortMethodTest(unittest.TestCase): # pytype: disable=module-attr +class FaucetPortMethodTest(unittest.TestCase): # pytype: disable=module-attr """Test a range of methods on Port""" def test_vlans(self): @@ -33,5 +37,44 @@ def test_vlans(self): self.assertEqual(set(port.vlans()), set(tagged_vlans)) +class FaucetLACPPortFunctions(unittest.TestCase): # pytype: disable=module-attr + """Test port LACP state functions work as expected""" + + def test_lacp_update(self): + """Test updating port LACP information causes correct actor state changes""" + port = Port(1, 1, {}) + # Initial state: Not configured + self.assertEqual(port.dyn_lacp_port_selected, LACP_STATE_NONE) + self.assertEqual(port.dyn_lacp_actor_state, LACP_STATE_NONE) + # Initializing + port.lacp_update(True, None, None) + self.assertEqual(port.dyn_lacp_actor_state, LACP_ACTOR_INIT) + # Receiving first packets but no sync + port.lacp_update(False, 1, 1) + self.assertEqual(port.dyn_lacp_actor_state, LACP_ACTOR_NOACT) + # Receiving sync packets + port.lacp_update(True, 1, 1) + self.assertEqual(port.dyn_lacp_actor_state, LACP_ACTOR_UP) + + def test_lacp_flags(self): + """Test port LACP flags returns correct flags for current port states""" + port = Port(1, 1, {}) + # LACP config option to force flags on + port.lacp_collect_and_distribute = True + self.assertEqual(port.get_lacp_flags(), (1, 1, 1)) + port.lacp_collect_and_distribute = False + # Port is selected, so flags should be on + port.dyn_lacp_port_selected = LACP_PORT_SELECTED + self.assertEqual(port.get_lacp_flags(), (1, 1, 1)) + # Port in standby, only allow sync + port.dyn_lacp_port_selected = LACP_PORT_STANDBY + self.assertEqual(port.get_lacp_flags(), (1, 0, 0)) + # Port not in standby, or selected + port.dyn_lacp_port_selected = LACP_PORT_UNSELECTED + self.assertEqual(port.get_lacp_flags(), (0, 0, 0)) + port.dyn_lacp_port_selected = LACP_STATE_NONE + self.assertEqual(port.get_lacp_flags(), (0, 0, 0)) + + if __name__ == "__main__": unittest.main() # pytype: disable=module-attr diff --git a/tests/unit/faucet/test_valve_stack.py b/tests/unit/faucet/test_valve_stack.py index a121edf127..35901d0842 100755 --- a/tests/unit/faucet/test_valve_stack.py +++ b/tests/unit/faucet/test_valve_stack.py @@ -26,7 +26,9 @@ from faucet import valves_manager from faucet import valve_of -from faucet.port import STACK_STATE_INIT, STACK_STATE_UP +from faucet.port import ( + STACK_STATE_INIT, STACK_STATE_UP, + LACP_PORT_SELECTED, LACP_PORT_UNSELECTED) from fakeoftable import CONTROLLER_PORT @@ -34,6 +36,198 @@ BASE_DP1_CONFIG, CONFIG, STACK_CONFIG, STACK_LOOP_CONFIG, ValveTestBases) +class ValveStackMCLAGTestCase(ValveTestBases.ValveTestSmall): + """Test stacked MCLAG""" + + CONFIG = """ +dps: + s1: +%s + stack: + priority: 1 + interfaces: + 1: + description: p1 + stack: + dp: s2 + port: 1 + 2: + description: p2 + native_vlan: 100 + 3: + description: p3 + native_vlan: 100 + lacp: 1 + 4: + description: p4 + native_vlan: 100 + lacp: 1 + s2: + hardware: 'GenericTFM' + dp_id: 0x2 + interfaces: + 1: + description: p1 + stack: + dp: s1 + port: 1 + 2: + description: p2 + native_vlan: 100 + 3: + description: p3 + native_vlan: 100 + lacp: 1 + 4: + description: p4 + native_vlan: 100 + lacp: 1 +""" % BASE_DP1_CONFIG + + def setUp(self): + """Setup basic loop config""" + self.setup_valve(self.CONFIG) + + def test_dpid_nominations(self): + """Test dpids are nominated correctly""" + self.activate_all_ports() + lacp_ports = {} + for valve in self.valves_manager.valves.values(): + for port in valve.dp.ports.values(): + if port.lacp: + lacp_ports.setdefault(valve.dp.dp_id, []) + lacp_ports[valve.dp.dp_id].append(port) + port.actor_up() + valve = self.valves_manager.valves[0x1] + other_valves = self.valves_manager._other_running_valves(valve) + # Equal number of LAG ports, choose root DP + nominated_dpid = valve.get_lacp_dpid_nomination(1, other_valves)[0] + self.assertEqual( + nominated_dpid, 0x1, + 'Expected nominated DPID %s but found %s' % (0x1, nominated_dpid)) + # Choose DP with most UP LAG ports + lacp_ports[0x1][0].actor_noact() + nominated_dpid = valve.get_lacp_dpid_nomination(1, other_valves)[0] + self.assertEqual( + nominated_dpid, 0x2, + 'Expected nominated DPID %s but found %s' % (0x2, nominated_dpid)) + + def test_no_dpid_nominations(self): + """Test dpid nomination doesn't nominate when no LACP ports are up""" + self.activate_all_ports() + valve = self.valves_manager.valves[0x1] + other_valves = self.valves_manager._other_running_valves(valve) + # No actors UP so should return None + nominated_dpid = valve.get_lacp_dpid_nomination(1, other_valves)[0] + self.assertEqual( + nominated_dpid, None, + 'Did not expect to nominate DPID %s' % nominated_dpid) + # No other valves so should return None + for valve in self.valves_manager.valves.values(): + for port in valve.dp.ports.values(): + if port.lacp: + port.actor_up() + nominated_dpid = valve.get_lacp_dpid_nomination(1, None)[0] + self.assertEqual( + nominated_dpid, None, + 'Did not expect to nominate DPID %s' % nominated_dpid) + + def test_nominated_dpid_port_selection(self): + """Test a nominated port selection state is changed""" + self.activate_all_ports() + lacp_ports = {} + for valve in self.valves_manager.valves.values(): + for port in valve.dp.ports.values(): + if port.lacp: + lacp_ports.setdefault(valve, []) + lacp_ports[valve].append(port) + port.actor_up() + for valve, ports in lacp_ports.items(): + other_valves = self.valves_manager._other_running_valves(valve) + for port in ports: + self.assertTrue( + valve.lacp_update_port_selection_state(port, other_valves), + 'Port selection state not updated') + if valve.dp.dp_id == 0x1: + self.assertEqual( + port.lacp_port_state(), LACP_PORT_SELECTED, + 'Expected LACP port %s DP %s to be SELECTED' % (port, valve)) + else: + self.assertEqual( + port.lacp_port_state(), LACP_PORT_UNSELECTED, + 'Expected LACP port %s DP %s to be UNSELECTED' % (port, valve)) + + def test_lag_flood(self): + """Test flooding is allowed for UP & SELECTED LAG links only""" + self.activate_all_ports() + main_valve = self.valves_manager.valves[0x1] + main_other_valves = self.valves_manager._other_running_valves(main_valve) + # Start with all LAG links NOACT & UNSELECTED + self.validate_flood(2, 0, 3, False, 'Flooded out UNSELECTED & NOACT LAG port') + self.validate_flood(2, 0, 4, False, 'Flooded out UNSELECTED & NOACT LAG port') + # Set UP & SELECTED one s1 LAG link + port3 = main_valve.dp.ports[3] + port4 = main_valve.dp.ports[4] + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port3, False, 1, 1, main_other_valves)) + self.validate_flood(2, 0, 3, False, 'Flooded out NOACT LAG port') + self.validate_flood(2, 0, 4, True, 'Did not flood out SELECTED LAG port') + # Set UP & SELECTED s2 LAG links + valve = self.valves_manager.valves[0x2] + other_valves = self.valves_manager._other_running_valves(valve) + for port in valve.dp.ports.values(): + if port.lacp: + valve.lacp_update(port, True, 1, 1, other_valves) + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port3, False, 1, 1, main_other_valves)) + self.validate_flood(2, 0, 3, False, 'Flooded out UNSELECTED & NOACT LAG port') + self.validate_flood(2, 0, 4, False, 'Flooded out UNSELECTED LAG port') + # Set UP & SELECTED both s1 LAG links + self.apply_ofmsgs(main_valve.lacp_update(port3, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.validate_flood(2, 0, 3, True, 'Did not flood out SELECTED LAG port') + self.validate_flood(2, 0, 4, False, 'Flooded out multiple LAG ports') + + def test_lag_pipeline_accept(self): + """Test packets entering through UP & SELECTED LAG links""" + self.activate_all_ports() + main_valve = self.valves_manager.valves[0x1] + main_other_valves = self.valves_manager._other_running_valves(main_valve) + # Packet initially rejected + self.validate_flood( + 3, 0, None, False, 'Packet incoming through UNSELECTED & NOACT port was accepted') + self.validate_flood( + 4, 0, None, False, 'Packet incoming through UNSELECTED & NOACT port was accepted') + # Set one s1 LAG port 4 to SELECTED & UP + port3 = main_valve.dp.ports[3] + port4 = main_valve.dp.ports[4] + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port3, False, 1, 1, main_other_valves)) + self.validate_flood( + 3, 0, None, False, 'Packet incoming through NOACT port was accepted') + self.validate_flood( + 4, 0, None, True, 'Packet incoming through SELECTED port was not accepted') + # Set UP & SELECTED s2 LAG links, set one s1 port down + valve = self.valves_manager.valves[0x2] + other_valves = self.valves_manager._other_running_valves(valve) + for port in valve.dp.ports.values(): + if port.lacp: + valve.lacp_update(port, True, 1, 1, other_valves) + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port3, False, 1, 1, main_other_valves)) + self.validate_flood( + 3, 0, None, False, 'Packet incoming through UNSELECTED & NOACT port was accepted') + self.validate_flood( + 4, 0, None, False, 'Packet incoming through UNSELECTED port was accepted') + # Set UP & SELECTED both s1 LAG links + self.apply_ofmsgs(main_valve.lacp_update(port3, True, 1, 1, main_other_valves)) + self.apply_ofmsgs(main_valve.lacp_update(port4, True, 1, 1, main_other_valves)) + self.validate_flood( + 3, 0, None, True, 'Packet incoming through SELECTED port was not accepted') + self.validate_flood( + 4, 0, None, True, 'Packet incoming through SELECTED port was not accepted') + + class ValveStackRootExtLoopProtectTestCase(ValveTestBases.ValveTestSmall): """External loop protect test cases""" @@ -1083,75 +1277,6 @@ def test_topo(self): self.assertFalse(dp.is_stack_edge()) -class ValveStackCollectDistributeLACPTestCase(ValveTestBases.ValveTestSmall): - """Test stack topology with 3 electable roots and one LACP link each.""" - - CONFIG = """ -dps: - s1: - dp_id: 0x1 - hardware: 'GenericTFM' - stack: - priority: 1 - interfaces: - 1: - lacp: 1 - native_vlan: 100 - loop_protect_external: True - 2: - stack: - dp: s2 - port: 2 - s2: - dp_id: 0x2 - hardware: 'GenericTFM' - stack: - priority: 2 - interfaces: - 1: - lacp: 1 - native_vlan: 100 - loop_protect_external: True - 2: - stack: - dp: s1 - port: 2 - 3: - stack: - dp: s3 - port: 2 - s3: - dp_id: 0x3 - hardware: 'GenericTFM' - stack: - priority: 3 - interfaces: - 1: - lacp: 1 - native_vlan: 100 - loop_protect_external: True - lacp_collect_and_distribute: True - 2: - stack: - dp: s2 - port: 3 - """ - - def setUp(self): - self.setup_valve(self.CONFIG) - - def test_topo(self): - """Test topology functions.""" - dp_ids = [0x1, 0x2, 0x3] - port = 1 - want_collect_and_distribute = [1, 0, 1] - - valves = [self.valves_manager.valves[dp_id] for dp_id in dp_ids] - ports = [valve.dp.ports[port] for valve in valves] - c_and_d = [valves[i].dp.lacp_collect_and_distribute(ports[i]) for i in range(0, len(dp_ids))] - self.assertEqual(c_and_d, want_collect_and_distribute) - - class ValveTwoDpRootEdge(ValveTestBases.ValveTestSmall): """Test simple stack topology from edge."""