Skip to content

Commit

Permalink
[CMIS] Improve readability by using individual functions for setting …
Browse files Browse the repository at this point in the history
…host, media, input, and output loopback modes

Signed-off-by: xinyu <[email protected]>
  • Loading branch information
xinyulin committed Aug 26, 2024
1 parent 7105a22 commit 143c8ad
Show file tree
Hide file tree
Showing 2 changed files with 339 additions and 105 deletions.
259 changes: 195 additions & 64 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,92 +1115,223 @@ def get_loopback_capability(self):
loopback_capability['media_side_output_loopback_supported'] = bool((allowed_loopback_result >> 0) & 0x1)
return loopback_capability

def set_loopback_mode(self, loopback_mode, lane_mask = 0xff):
def set_host_input_loopback(self, lane_mask, enable):
'''
This function sets the module loopback mode.
loopback_mode: Loopback mode has to be one of the five:
1. "none"
2. "host-side-input-none"
3. "host-side-output-none",
4. "media-side-input-none"
5. "media-side-output-none"
6. "host-side-input"
7. "host-side-output"
8. "media-side-input"
9. "media-side-output"
lane_mask: A bitmask representing which lanes to apply the loopback mode to.
The default value of 0xFF indicates that the mode should be applied to all lanes.
The function will look at 13h:128 to check advertized loopback capabilities.
Return True if the provision succeeds, False if it fails
Sets the host-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['host_side_input_loopback_supported'] is False:
txt = 'Host input loopback is not supported'
logger.error(txt)
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
txt = f'Per-lane host input loopback is not supported, lane_mask:{lane_mask:02x}'
logger.error(txt)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}'
logger.error(txt)
return False

host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
host_input_support = loopback_capability['host_side_input_loopback_supported']
host_output_support = loopback_capability['host_side_output_loopback_supported']
media_input_support = loopback_capability['media_side_input_loopback_supported']
media_output_support = loopback_capability['media_side_output_loopback_supported']

if lane_mask != 0xff:
if any([loopback_capability['per_lane_host_loopback_supported'] is False and 'host' in loopback_mode,
loopback_capability['per_lane_media_loopback_supported'] is False and 'media' in loopback_mode]):
txt = f'Per-lane {loopback_mode} loopback is not supported, lane_mask:{lane_mask:02x}\n'
if enable:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask)

def set_host_output_loopback(self, lane_mask, enable):
'''
Sets the host-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_capability['host_side_output_loopback_supported'] is False:
txt = 'Host output loopback is not supported'
logger.error(txt)
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
txt = f'Per-lane host output loopback is not supported, lane_mask:{lane_mask:02x}'
logger.error(txt)
return False

if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if media_input_val or media_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}'
logger.error(txt)
return False

if 'none' in loopback_mode:
if loopback_mode == 'none':
status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0)
status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0)
status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0)
status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0)
return all([status_host_input, status_host_output, status_media_input, status_media_output])
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask)

def set_media_input_loopback(self, lane_mask, enable):
'''
Sets the media-side input loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
if loopback_mode == 'host-side-input-none':
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask)
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_mode == 'host-side-output-none':
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask)
if loopback_capability['media_side_input_loopback_supported'] is False:
txt = 'Media input loopback is not supported'
logger.error(txt)
return False

if loopback_mode == 'media-side-input-none':
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask)
if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
txt = f'Per-lane media input loopback is not supported, lane_mask:{lane_mask:02x}'
logger.error(txt)
return False

if loopback_mode == 'media-side-output-none':
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask)
if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}'
logger.error(txt)
return False

media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask)
else:
if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
if any(['host' in loopback_mode and (media_input_val or media_output_val),
'media' in loopback_mode and (host_input_val or host_output_val)]):
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, '
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n'
logger.error(txt)
return False
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask)

def set_media_output_loopback(self, lane_mask, enable):
'''
Sets the media-side output loopback mode for specified lanes.
Args:
lane_mask (int): A bitmask indicating which lanes to enable/disable loopback.
- 0xFF: Enable loopback on all lanes.
- Individual bits represent corresponding lanes.
enable (bool): True to enable loopback, False to disable.
Returns:
bool: True if the operation succeeds, False otherwise.
'''
loopback_capability = self.get_loopback_capability()
if loopback_capability is None:
logger.info('Failed to get loopback capabilities')
return False

if loopback_mode == 'host-side-input' and host_input_support:
return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask)
if loopback_capability['media_side_output_loopback_supported'] is False:
txt = 'Media output loopback is not supported'
logger.error(txt)
return False

if loopback_mode == 'host-side-output' and host_output_support:
return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask)
if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
txt = f'Per-lane media input loopback is not supported, lane_mask:{lane_mask:02x}'
logger.error(txt)
return False

if loopback_mode == 'media-side-input' and media_input_support:
return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask)
if loopback_capability['simultaneous_host_media_loopback_supported'] is False:
host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK)
host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK)
if host_input_val or host_output_val:
txt = 'Simultaneous host media loopback is not supported\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}'
logger.error(txt)
return False

if loopback_mode == 'media-side-output' and media_output_support:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask)
media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if enable:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask)
else:
return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask)

def set_loopback_mode(self, loopback_mode, lane_mask = 0xff):
'''
This function sets the module loopback mode.
Args:
- loopback_mode (str): Specifies the loopback mode. It must be one of the following:
1. "none"
2. "host-side-input-none"
3. "host-side-output-none"
4. "media-side-input-none"
5. "media-side-output-none"
6. "host-side-input"
7. "host-side-output"
8. "media-side-input"
9. "media-side-output"
- lane_mask (int): A bitmask representing the lanes to which the loopback mode should
be applied. Default 0xFF applies to all lanes.
Returns:
- bool: True if the operation succeeds, False otherwise.
'''
if loopback_mode == 'none':
return all([
self.set_host_input_loopback(0xff, False),
self.set_host_output_loopback(0xff, False),
self.set_media_input_loopback(0xff, False),
self.set_media_output_loopback(0xff, False)
])
elif loopback_mode == 'host-side-input-none':
return self.set_host_input_loopback(lane_mask, False)
elif loopback_mode == 'host-side-output-none':
return self.set_host_output_loopback(lane_mask, False)
elif loopback_mode == 'media-side-input-none':
return self.set_media_input_loopback(lane_mask, False)
elif loopback_mode == 'media-side-output-none':
return self.set_media_output_loopback(lane_mask, False)
elif loopback_mode == 'host-side-input':
return self.set_host_input_loopback(lane_mask, True)
elif loopback_mode == 'host-side-output':
return self.set_host_output_loopback(lane_mask, True)
elif loopback_mode == 'media-side-input':
return self.set_media_input_loopback(lane_mask, True)
elif loopback_mode == 'media-side-output':
return self.set_media_output_loopback(lane_mask, True)

txt = f'Failed to set {loopback_mode} loopback, lane_mask:{lane_mask:02x}\n'
txt += f'host_input_support:{host_input_support}, host_output_support:{host_output_support}, '
txt += f'media_input_support:{media_input_support}, media_output_support:{media_output_support}\n'
txt += f'host_input_val:{host_input_val:02x}, host_output_val:{host_output_val:02x}, '
txt += f'media_input_val:{media_input_val:02x}, media_output_val:{media_output_val:02x}\n'
logger.error(txt)
return False

Expand Down
Loading

0 comments on commit 143c8ad

Please sign in to comment.