diff --git a/src/ax_interface/agent.py b/src/ax_interface/agent.py index 82766666a..b20fbc8eb 100644 --- a/src/ax_interface/agent.py +++ b/src/ax_interface/agent.py @@ -8,7 +8,7 @@ class Agent: - def __init__(self, mib_cls, update_frequency, loop): + def __init__(self, mib_cls, enable_dynamic_frequency, update_frequency, loop): if not type(mib_cls) is MIBMeta: raise ValueError("Expected a class with type: {}".format(MIBMeta)) @@ -20,7 +20,7 @@ def __init__(self, mib_cls, update_frequency, loop): self.stopped = asyncio.Event() # Initialize our MIB - self.mib_table = MIBTable(mib_cls, update_frequency) + self.mib_table = MIBTable(mib_cls, enable_dynamic_frequency, update_frequency) # containers self.socket_mgr = SocketManager(self.mib_table, self.run_enabled, self.loop) diff --git a/src/ax_interface/constants.py b/src/ax_interface/constants.py index 1c5b8a8ec..bccd6512e 100644 --- a/src/ax_interface/constants.py +++ b/src/ax_interface/constants.py @@ -90,3 +90,8 @@ class PduTypes(int, Enum): DEFAULT_PDU_TIMEOUT = 5 + +# MIBUpdater rate: Interval/Execution time +UPDATE_FREQUENCY_RATE = 10 +# MIBUpdater max update interval +MAX_UPDATE_INTERVAL = 60 diff --git a/src/ax_interface/mib.py b/src/ax_interface/mib.py index 404e2916a..4a8db2a3f 100644 --- a/src/ax_interface/mib.py +++ b/src/ax_interface/mib.py @@ -1,10 +1,12 @@ import asyncio import bisect import random +from datetime import datetime from . import logger, util from .constants import ValueType from .encodings import ValueRepresentation +from .util import get_next_update_interval """ Update interval between update runs (in seconds). @@ -16,6 +18,11 @@ """ DEFAULT_REINIT_RATE = 60 +""" +Disable dynamic frequency by default +""" +DEFAULT_ENABLE_DYNAMIC_FREQUENCY = False + class MIBUpdater: """ @@ -25,6 +32,7 @@ class MIBUpdater: def __init__(self): self.run_event = asyncio.Event() self.frequency = DEFAULT_UPDATE_FREQUENCY + self.enable_dynamic_frequency = DEFAULT_ENABLE_DYNAMIC_FREQUENCY self.reinit_rate = DEFAULT_REINIT_RATE // DEFAULT_UPDATE_FREQUENCY self.update_counter = self.reinit_rate + 1 # reinit_data when init @@ -32,6 +40,7 @@ async def start(self): # Run the update while we are allowed redis_exception_happen = False while self.run_event.is_set(): + start = datetime.now() try: # reinit internal structures if self.update_counter > self.reinit_rate: @@ -57,9 +66,36 @@ async def start(self): # Any unexpected exception or error, log it and keep running logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()") + if self.enable_dynamic_frequency: + """ + On SONiC device with huge interfaces + for example RP card on ethernet chassis, including backend ports, 600+ interfaces + The update_data function could be very slow, especially when 100% CPU utilization. + for example ciscoSwitchQosMIB.QueueStatUpdater, uses 1-3 seconds on normal state. + uses 3-8 seconds on 100% CPU utilization state. + We use Asyncio/Coroutine as the basic framework, + the mib updaters share the same asyncio event loop with the SNMP agent client. + Hence during the updaters executing, the agent client can't receive/respond to new requests, + + The high frequency and the long execution time + causes the SNMP request to be timed out on High CPU utilization. + The stable frequency(generally with default value 5s) + doesn't works well on this huge interfaces situation. + when the execution time is long, + wait for longer time to give back the control of asyncio event loop to SNMP agent + """ + execution_time = (datetime.now() - start).total_seconds() + next_frequency = get_next_update_interval(execution_time, self.frequency) + + if next_frequency > self.frequency: + logger.debug(f"MIBUpdater type[{type(self)}] slow update detected, " + f"update execution time[{execution_time}], next_frequency[{next_frequency}]") + else: + next_frequency = self.frequency + # wait based on our update frequency before executing again. # randomize to avoid concurrent update storms. - await asyncio.sleep(self.frequency + random.randint(-2, 2)) + await asyncio.sleep(next_frequency + random.randint(-2, 2)) def reinit_data(self): """ @@ -275,10 +311,13 @@ class MIBTable(dict): Simplistic LUT for Get/GetNext OID. Interprets iterables as keys and implements the same interfaces as dict's. """ - def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY): + def __init__(self, mib_cls, + enable_dynamic_frequency=DEFAULT_ENABLE_DYNAMIC_FREQUENCY, + update_frequency=DEFAULT_UPDATE_FREQUENCY): if type(mib_cls) is not MIBMeta: raise ValueError("Supplied object is not a MIB class instance.") super().__init__(getattr(mib_cls, MIBMeta.KEYSTORE)) + self.enable_dynamic_frequency = enable_dynamic_frequency self.update_frequency = update_frequency self.updater_instances = getattr(mib_cls, MIBMeta.UPDATERS) self.prefixes = getattr(mib_cls, MIBMeta.PREFIXES) @@ -296,6 +335,7 @@ def start_background_tasks(self, event): tasks = [] for updater in self.updater_instances: updater.frequency = self.update_frequency + updater.enable_dynamic_frequency = self.enable_dynamic_frequency updater.run_event = event fut = asyncio.ensure_future(updater.start()) fut.add_done_callback(MIBTable._done_background_task_callback) diff --git a/src/ax_interface/util.py b/src/ax_interface/util.py index afe369115..96cf0cc8c 100644 --- a/src/ax_interface/util.py +++ b/src/ax_interface/util.py @@ -1,4 +1,5 @@ import ipaddress +import math import re from ax_interface import constants @@ -108,3 +109,31 @@ def ip2byte_tuple(ip): """ return tuple(i for i in ipaddress.ip_address(ip).packed) + +def get_next_update_interval(execution_time, static_frequency): + """ + >>> get_next_update_interval(0.4, 5) + 5 + >>> get_next_update_interval(0.87, 5) + 9 + >>> get_next_update_interval(18.88, 5) + 60 + + + :param static_frequency: Static frequency, generally use default value 5 + :param execution_time: The execution time of the updater + :return: the interval before next update + + We expect the rate of 'update interval'/'update execution time' >= UPDATE_FREQUENCY_RATE(10) + Because we're using asyncio/Coroutines, the update execution blocks SNMP proxy service and other updaters. + Generally we expect the update to be quick and the execution time/interval time < 0.25 + Given the static_frequency == 5, + if the execution_time < 0.5, + the update interval is(for example) 1.1s + It sleeps 1.1s * 10 = 11s before run next update + + """ + frequency_based_on_execution_time = math.ceil(execution_time * constants.UPDATE_FREQUENCY_RATE) + frequency_based_on_execution_time = min(frequency_based_on_execution_time, constants.MAX_UPDATE_INTERVAL) + + return max(static_frequency, frequency_based_on_execution_time) diff --git a/src/sonic_ax_impl/__main__.py b/src/sonic_ax_impl/__main__.py index d9f117db0..a4b24bab4 100644 --- a/src/sonic_ax_impl/__main__.py +++ b/src/sonic_ax_impl/__main__.py @@ -5,11 +5,12 @@ import shutil import sys -import sonic_py_common.util from swsscommon import swsscommon import ax_interface import sonic_ax_impl + +from .utils.arg_parser import process_options from . import mibs LOG_FORMAT = "snmp-subagent [%(name)s] %(levelname)s: %(message)s" @@ -70,7 +71,7 @@ def logging_level_to_swss_level(log_level): sys.exit(0) # import command line arguments - args = sonic_py_common.util.process_options("sonic_ax_impl") + args = process_options("sonic_ax_impl") # configure logging. If debug '-d' is specified, logs to stdout at designated level. syslog/INFO otherwise. log_level = log_level_sdk = args.get('log_level') @@ -110,4 +111,4 @@ def logging_level_to_swss_level(log_level): from .main import main - main(update_frequency=args.get('update_frequency')) + main(enable_dynamic_frequency=args.get('enable_dynamic_frequency'), update_frequency=args.get('update_frequency')) diff --git a/src/sonic_ax_impl/main.py b/src/sonic_ax_impl/main.py index 281ab601b..c798ab690 100644 --- a/src/sonic_ax_impl/main.py +++ b/src/sonic_ax_impl/main.py @@ -54,14 +54,14 @@ def shutdown(signame, agent): shutdown_task = event_loop.create_task(agent.shutdown()) -def main(update_frequency=None): +def main(enable_dynamic_frequency=False, update_frequency=None): global event_loop try: Namespace.init_sonic_db_config() # initialize handler and set update frequency (or use the default) - agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop) + agent = ax_interface.Agent(SonicMIB, enable_dynamic_frequency, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop) # add "shutdown" signal handlers # https://docs.python.org/3.5/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm diff --git a/src/sonic_ax_impl/utils/__init__.py b/src/sonic_ax_impl/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/sonic_ax_impl/utils/arg_parser.py b/src/sonic_ax_impl/utils/arg_parser.py new file mode 100644 index 000000000..788c4b2f9 --- /dev/null +++ b/src/sonic_ax_impl/utils/arg_parser.py @@ -0,0 +1,39 @@ +from __future__ import print_function +import sys +from getopt import getopt + + +def usage(script_name): + print('Usage: python ', script_name, + '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + +def process_options(script_name): + """ + Process command line options + """ + options, remainders = getopt(sys.argv[1:], "t:p:s:d:f:rh", ["host=", "port=", "unix_socket_path=", "debug=", "frequency=", "enable_dynamic_frequency", "help"]) + + args = {} + for (opt, arg) in options: + try: + if opt in ('-d', '--debug'): + args['log_level'] = int(arg) + elif opt in ('-t', '--host'): + args['host'] = arg + elif opt in ('-p', '--port'): + args['port'] = int(arg) + elif opt in ('-s', '--unix_socket_path'): + args['unix_socket_path'] = arg + elif opt in ('-f', '--frequency'): + args['update_frequency'] = int(arg) + elif opt in ('-r', '--enable_dynamic_frequency'): + args['enable_dynamic_frequency'] = True + elif opt in ('-h', '--help'): + usage(script_name) + sys.exit(0) + except ValueError as e: + print('Invalid option for {}: {}'.format(opt, e)) + sys.exit(1) + + return args diff --git a/tests/test_agent.py b/tests/test_agent.py index 6b16e0cd1..92e5ff7b7 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -22,6 +22,6 @@ async def delayed_shutdown(self, agent): def test_agent_loop(self): event_loop = asyncio.get_event_loop() - agent = ax_interface.Agent(SonicMIB, 5, event_loop) + agent = ax_interface.Agent(SonicMIB, False, 5, event_loop) event_loop.create_task(self.delayed_shutdown(agent)) event_loop.run_until_complete(agent.run_in_event_loop()) diff --git a/tests/test_arg_parser.py b/tests/test_arg_parser.py new file mode 100644 index 000000000..fc2f9033f --- /dev/null +++ b/tests/test_arg_parser.py @@ -0,0 +1,152 @@ +from unittest import TestCase +from unittest.mock import patch + +import pytest +from sonic_ax_impl.utils.arg_parser import process_options + + +class TestUtil(TestCase): + + # Given: Don't pass any parameter + # When: Parse args + # Then: Return empty dict + @patch('sys.argv', ['sonic_ax_impl']) + def test_valid_options_default_value_none(self): + args = process_options("sonic_ax_impl") + + self.assertNotIn("log_level", args) + self.assertNotIn("host", args) + self.assertNotIn("port", args) + self.assertNotIn("unix_socket_path", args) + self.assertNotIn("update_frequency", args) + self.assertNotIn("enable_dynamic_frequency", args) + + # Given: Pass --port=aaa + # When: Parse args + # Then: Print valure error + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '--port=aaa']) + def test_valid_options_value_error(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 1 + mock_print.assert_called_with("Invalid option for --port: invalid literal for int() with base 10: 'aaa'") + + # Given: Pass -h + # When: Parse args + # Then: Print help logs + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '-h']) + def test_valid_options_help(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 0 + mock_print.assert_called_with('Usage: python ', 'sonic_ax_impl', '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + # Given: Pass help + # When: Parse args + # Then: Print help logs + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '--help']) + def test_valid_options_help_long(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 0 + mock_print.assert_called_with('Usage: python ', 'sonic_ax_impl', '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + # Given: Pass -r + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '-r']) + def test_valid_options_enable_dynamic_frequency(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["enable_dynamic_frequency"], True) + + # Given: Pass --enable_dynamic_frequency + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '--enable_dynamic_frequency']) + def test_valid_options_enable_dynamic_frequency_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["enable_dynamic_frequency"], True) + + # Given: Pass -f + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '-f9']) + def test_valid_options_update_frequency(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["update_frequency"], 9) + + # Given: Pass --frequency + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '--frequency=9']) + def test_valid_options_update_frequency_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["update_frequency"], 9) + + # Given: Pass -s + # When: Parse args + # Then: Parse socket + @patch('sys.argv', ['sonic_ax_impl', '-s/unix/socket']) + def test_valid_options_socket(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["unix_socket_path"], "/unix/socket") + + # Given: Pass --unix_socket_path + # When: Parse args + # Then: Parse socket + @patch('sys.argv', ['sonic_ax_impl', '--unix_socket_path=/unix/socket']) + def test_valid_options_socket_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["unix_socket_path"], "/unix/socket") + + # Given: Pass -p + # When: Parse args + # Then: Parse port + @patch('sys.argv', ['sonic_ax_impl', '-p6666']) + def test_valid_options_port(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["port"], 6666) + + # Given: Pass --port + # When: Parse args + # Then: Parse port + @patch('sys.argv', ['sonic_ax_impl', '--port=6666']) + def test_valid_options_port_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["port"], 6666) + + # Given: Pass -t + # When: Parse args + # Then: Parse host + @patch('sys.argv', ['sonic_ax_impl', '-tsnmp.com']) + def test_valid_options_host(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["host"], 'snmp.com') + + # Given: Pass --host + # When: Parse args + # Then: Parse host + @patch('sys.argv', ['sonic_ax_impl', '--host=snmp.com']) + def test_valid_options_host_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["host"], 'snmp.com') + + # Given: Pass -d + # When: Parse args + # Then: Parse log_level + @patch('sys.argv', ['sonic_ax_impl', '-d9']) + def test_valid_options_host(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["log_level"], 9) + + # Given: Pass --debug + # When: Parse args + # Then: Parse log_level + @patch('sys.argv', ['sonic_ax_impl', '--debug=9']) + def test_valid_options_host_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["log_level"], 9) + diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 000000000..851b2a98f --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,54 @@ +from unittest import TestCase + +from ax_interface.util import get_next_update_interval + + +class TestUtil(TestCase): + # Given: Update is quick, execution time is 0.000001, static interval is 5/10/15s + # When: get next interval + # Then: Return default interval, 5/10/15s + def test_get_interval_quick_finish(self): + for static_interval in [5, 10, 15]: + self.assertEqual(get_next_update_interval(0.000001, static_interval), static_interval) + + # Given: Update is slow, execution time is 0.7666666, static interval is 5 + # When: get next interval + # Then: Return the ceil(0.766666 * 10) = 8 + def test_get_interval_slow_finish(self): + self.assertEqual(get_next_update_interval(0.766666, 5), 8) + + # Given: Update is slow, execution time is 0.766666, static interval is 10 + # When: get next interval + # Then: Return default interval, 10 + def test_get_interval_slow_finish_default_long(self): + self.assertEqual(get_next_update_interval(0.766666, 10), 10) + + # Given: Update is very slow, execution time is 20.2324, static interval is 10 + # When: get next interval + # Then: Return max interval, 60 + def test_get_interval_very_slow(self): + self.assertEqual(get_next_update_interval(20.2324, 10), 60) + + # Given: Get a 0 as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_zero(self): + self.assertEqual(get_next_update_interval(0, 5), 5) + + # Given: Get a 0.000000 as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_zero_long(self): + self.assertEqual(get_next_update_interval(0.000000, 5), 5) + + # Given: Wrongly get a negative number(-0.000001) as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_negative(self): + self.assertEqual(get_next_update_interval(-0.000001, 5), 5) + + # Given: Wrongly get a negative number(-10.000001) as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_negative_slow(self): + self.assertEqual(get_next_update_interval(-10.000001, 5), 5)