forked from sonic-net/sonic-platform-common
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsfputilhelper.py
130 lines (99 loc) · 4.46 KB
/
sfputilhelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# sfputilbase.py
#
# Base class for creating platform-specific SFP transceiver interfaces for SONiC
#
from __future__ import print_function
try:
import os
import re
import sys
from collections import OrderedDict
from natsort import natsorted
from portconfig import get_port_config
from sonic_py_common import device_info, multi_asic
from sonic_py_common.interface import backplane_prefix, inband_prefix, recirc_prefix
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))
# Global Variable
PLATFORM_JSON = 'platform.json'
PORT_CONFIG_INI = 'port_config.ini'
ASIC_NAME_PREFIX = 'asic'
class SfpUtilHelper(object):
# List to specify filter for sfp_ports
# Needed by platforms like dni-6448 which
# have only a subset of ports that support sfp
sfp_ports = []
# List of logical port names available on a system
""" ["swp1", "swp5", "swp6", "swp7", "swp8" ...] """
logical = []
# Mapping of logical port names available on a system to ASIC num
logical_to_asic = {}
# dicts for easier conversions between logical, physical and bcm ports
logical_to_physical = {}
physical_to_logical = {}
physical_to_phyaddrs = {}
def __init__(self):
pass
def read_porttab_mappings(self, porttabfile, asic_inst=0):
logical = []
logical_to_physical = {}
physical_to_logical = {}
fp_port_index = 1
(platform, hwsku) = device_info.get_platform_and_hwsku()
asic_name = None
if multi_asic.is_multi_asic():
asic_name = ASIC_NAME_PREFIX + str(asic_inst)
ports, _, _ = get_port_config(hwsku, platform, asic_name=asic_name)
if not ports:
ports, _, _ = get_port_config(hwsku, platform, porttabfile)
if not ports:
print('Failed to get port config', file=sys.stderr)
sys.exit(1)
logical_list = []
for intf in ports.keys():
logical_list.append(intf)
# Ignore if this is an internal backplane interface and Inband interface
logical = [name for name in logical_list
if not name.startswith((backplane_prefix(), inband_prefix(), recirc_prefix()))]
logical = natsorted(logical, key=lambda y: y.lower())
logical_to_physical, physical_to_logical = OrderedDict(), OrderedDict()
for intf_name in logical:
if 'index' in ports[intf_name].keys():
fp_port_index = int(ports[intf_name]['index'])
logical_to_physical[intf_name] = [fp_port_index]
if physical_to_logical.get(fp_port_index) is None:
physical_to_logical[fp_port_index] = [intf_name]
else:
physical_to_logical[fp_port_index].append(intf_name)
# Mapping of logical port names available on a system to ASIC instance
self.logical_to_asic[intf_name] = asic_inst
self.logical.extend(logical)
self.logical = list(OrderedDict.fromkeys(self.logical).keys())
self.logical_to_physical.update(logical_to_physical)
self.physical_to_logical.update(physical_to_logical)
return None
def read_all_porttab_mappings(self, platform_dir, num_asic_inst):
# In multi asic scenario, get all the port_config files for different asic
for inst in range(num_asic_inst):
port_map_dir = os.path.join(platform_dir, str(inst))
port_map_file = os.path.join(port_map_dir, PORT_CONFIG_INI)
if os.path.exists(port_map_file):
self.read_porttab_mappings(port_map_file, inst)
else:
port_json_file = os.path.join(platform_dir, PLATFORM_JSON)
if os.path.exists(port_json_file):
self.read_porttab_mappings(port_json_file, inst)
def get_physical_to_logical(self, port_num):
"""Returns list of logical ports for the given physical port"""
return self.physical_to_logical.get(port_num)
def get_logical_to_physical(self, logical_port):
"""Returns list of physical ports for the given logical port"""
return self.logical_to_physical[logical_port]
def is_logical_port(self, port):
if port in self.logical:
return 1
else:
return 0
def get_asic_id_for_logical_port(self, logical_port):
"""Returns the asic_id list of physical ports for the given logical port"""
return self.logical_to_asic.get(logical_port)