Solution Review: Spanning Tree Protocol
In this lesson, we'll look at a solution to the spanning tree protocol programming assignment.
We'll cover the following...
Solution
Press + to interact
main.py
topology_reader.py
ports.py
simulator.py
bridge.py
from ports import portsdef is_better(BPDU1, BPDU2):# If root is greater than BPDU1 is betterif(BPDU1[0] < BPDU2[0]):return 1elif(BPDU1[0] == BPDU2[0] and BPDU1[1] < BPDU2[1]):return 1elif(BPDU1[0] == BPDU2[0] and BPDU1[1] == BPDU2[1] and BPDU1[2] < BPDU2[2]):return 1elif(BPDU1[0] == BPDU2[0] and BPDU1[1] == BPDU2[1] and BPDU1[2] == BPDU2[2] and BPDU1[3] < BPDU2[3]):return 1else:return 0class bridge:def __init__(self, bridge_ID, port_list):self.bridge_ID = bridge_IDself.port_list = port_list # port_list[0] is the port with port number 0self.config_BPDU = [bridge_ID, 0, bridge_ID, None] # Root ID, Cost, Transmitting Bridge ID, Transmitting Mac Addressself.receive_queue = {}def initialize_recv_queue(self, bridges_dict):for b in bridges_dict:self.receive_queue[b] = []def set_bridge(self, bridge_ID, num_ports, mac_addresses):self.bridge_ID = bridge_IDself.num_ports = num_portsself.port_list = port_listdef get_port(self, bridge_number, bridges_dict):for i in range(len(self.port_list)):if(bridge_number in self.port_list[i].get_reachable_bridge_ID(bridges_dict, self.bridge_ID)):return idef find_best_BPDUs_received(self, bridges_dict):# Select best BPDU at each bridgebest_BPDUs_recvd = {} # Bridge Number : BPDUbest_bpdu = 0for sending_brg_id in self.receive_queue:if(len(self.receive_queue[sending_brg_id]) > 0):best_bpdu = self.receive_queue[sending_brg_id][0]for bpdu in self.receive_queue[sending_brg_id]:if is_better(bpdu, best_bpdu):best_bpdu = bpdubest_BPDUs_recvd[sending_brg_id] = best_bpduself.initialize_recv_queue(bridges_dict)return best_BPDUs_recvddef update_ports(self, bridges_dict, best_BPDUs_recvd):for sending_brg_id in best_BPDUs_recvd:if sending_brg_id is not self.config_BPDU[0]:if self.port_list[self.get_port(sending_brg_id, bridges_dict)].port_type != 2:pn = self.get_port(sending_brg_id, bridges_dict)if(is_better(best_BPDUs_recvd[sending_brg_id], self.get_config_BPDU_at_port(pn))):self.port_list[self.get_port(sending_brg_id, bridges_dict)].port_type = 0else:self.port_list[self.get_port(sending_brg_id, bridges_dict)].port_type = 1def elect_root(self, bridges_dict, best_BPDUs_recvd):for sending_brg_id in best_BPDUs_recvd:if(best_BPDUs_recvd[sending_brg_id][0] < self.config_BPDU[0]):# New root bridgeself.config_BPDU[0] = best_BPDUs_recvd[sending_brg_id][0]self.config_BPDU[1] = best_BPDUs_recvd[sending_brg_id][1] + 1self.config_BPDU[2] = self.bridge_IDself.config_BPDU[3] = self.get_port(sending_brg_id, bridges_dict)if(self.get_root_port_id() is not None):self.port_list[self.get_root_port_id()].port_type = 1self.port_list[self.get_port(sending_brg_id, bridges_dict)].port_type = 2def send_BPDUs(self, bridges_dict):# Find all neighborsfor p in self.port_list:if(p.get_port_state() > 0): # If sending port# Send to all bridges on that segmentfor b in p.get_reachable_bridge_ID(bridges_dict, self.bridge_ID):pn = self.get_port(b, bridges_dict)bridges_dict[b].receive_queue[self.bridge_ID].append(self.get_config_BPDU_at_port(pn))return(bridges_dict)def receive_BPDUs(self, bridges_dict):# Update the bridge's state according to the best BPDUs received# Find best BPDU received from each bridgebest_BPDUs_recvd = self.find_best_BPDUs_received(bridges_dict)# Compare BPDUs with those received at non-root portsself.update_ports(bridges_dict, best_BPDUs_recvd)# Find root bridgeself.elect_root(bridges_dict, best_BPDUs_recvd)# Update dictionary and returnbridges_dict[self.bridge_ID] = selfreturn bridges_dictdef get_root_port_id(self):for p in range(len(self.port_list)):if self.port_list[p].port_type == 2:return preturn Nonedef get_config_BPDU_at_port(self, port_number):BPDU_at_port = self.config_BPDUBPDU_at_port[3] = self.port_list[port_number].mac_addressreturn(BPDU_at_port)def print_bridge(self):print("~~~~~~Bridge ID: " + str(self.bridge_ID) + " Root ID: " + str(self.config_BPDU[0]) + "~~~~~~" )print("BPDU:")print(self.config_BPDU)print("MAC address | Port Type | Segment Number")for port in self.port_list:port.print_port()
Explanation
send_BPDUs()
This function called on every bridge in the topology in a round-robin fashion. This function takes the bridges dictionary as input, makes some updates, and returns it.
It first iterates over all of non-blocking ports to find its neighbors. The neighbors are found by calling the ...