From 9e2c50ad77f8ba65e48f993631a48f4d666aeec9 Mon Sep 17 00:00:00 2001 From: Jianquan Ye Date: Tue, 14 Jan 2025 10:36:37 +1000 Subject: [PATCH] Fix snmp agent not-responding issue when high CPU utilization (#345) Fix sonic-net/sonic-buildimage#21314 The SNMP agent and MIB updaters are basic on Asyncio/Coroutine, the mib updaters share the same Asyncio event loop with the SNMP agent client. Hence during the updaters executing, the agent client can't receive/respond to new requests. When the CPU utilization is high (In some stress test we make CPU 100% utilization), the updates are slow, and this causes the snmpd request to be timeout because the agent got suspended during updating. - What I did Decrease the MIB update frequency when the update execution is slow. pros: The snmp request can success even if 100% CPU utilization. The snmpd request seldomly fails due to timeout, combined with Update snmpd.conf.j2 prolong agentXTimeout to avoid timeout failure in high CPU utilization scenario sonic-buildimage#21316 , we have 4*5 = 20s time windows for the SNMP agent to wait for the MIB updates finish and respond to snmpd request. Relief the CPU cost when CPU is high, the can avoid CPU becomes more crowded. cons: Tested on pizzabox (4600c), the updaters are very fast, generally finished within 0.001~0.02s, the chagne won't actually affect the frequency and interval. On Cisco chassis, the update of SNMP data could be delayed for 10~20s(at most 60s in extreme situation). Per my oberservation, most of the updater finishes within 0.5s. But for 1.a ciscoSwitchQosMIB.QueueStatUpdater, generally finishs in 0.5-2s, expected to be delayed to 5-20s 1.b PhysicalTableMIBUpdater, generally finishs in 0.5-1.5s, expected to be delayed to 5-1.5s 1.c ciscoPfcExtMIB.PfcPrioUpdater, generally finishs in 0.5-3s, expected to be delayed to 5-30s - How I did it In get_next_update_interval, we compute the interval based on current execution time. Roughly, we make the 'update interval'/'update execution time' >= UPDATE_FREQUENCY_RATE(10) More specifically, if the execution time is 2.000001s, we sleep 21s before next update round. And the max interval won't be longer than MAX_UPDATE_INTERVAL(60s). - How to verify it Test on Cisco chassis, test_snmp_cpu.py which triggers 100% CPU utilization test whether snmp requests work well. co-authorized by: jianquanye@microsoft.com --- src/ax_interface/agent.py | 4 +- src/ax_interface/constants.py | 5 + src/ax_interface/mib.py | 44 +++++++- src/ax_interface/util.py | 29 +++++ src/sonic_ax_impl/__main__.py | 7 +- src/sonic_ax_impl/main.py | 4 +- src/sonic_ax_impl/utils/__init__.py | 0 src/sonic_ax_impl/utils/arg_parser.py | 39 +++++++ tests/test_agent.py | 2 +- tests/test_arg_parser.py | 152 ++++++++++++++++++++++++++ tests/test_utils.py | 54 +++++++++ 11 files changed, 330 insertions(+), 10 deletions(-) create mode 100644 src/sonic_ax_impl/utils/__init__.py create mode 100644 src/sonic_ax_impl/utils/arg_parser.py create mode 100644 tests/test_arg_parser.py create mode 100644 tests/test_utils.py diff --git a/src/ax_interface/agent.py b/src/ax_interface/agent.py index 82766666a..b20fbc8eb 100644 --- a/src/ax_interface/agent.py +++ b/src/ax_interface/agent.py @@ -8,7 +8,7 @@ class Agent: - def __init__(self, mib_cls, update_frequency, loop): + def __init__(self, mib_cls, enable_dynamic_frequency, update_frequency, loop): if not type(mib_cls) is MIBMeta: raise ValueError("Expected a class with type: {}".format(MIBMeta)) @@ -20,7 +20,7 @@ def __init__(self, mib_cls, update_frequency, loop): self.stopped = asyncio.Event() # Initialize our MIB - self.mib_table = MIBTable(mib_cls, update_frequency) + self.mib_table = MIBTable(mib_cls, enable_dynamic_frequency, update_frequency) # containers self.socket_mgr = SocketManager(self.mib_table, self.run_enabled, self.loop) diff --git a/src/ax_interface/constants.py b/src/ax_interface/constants.py index 1c5b8a8ec..bccd6512e 100644 --- a/src/ax_interface/constants.py +++ b/src/ax_interface/constants.py @@ -90,3 +90,8 @@ class PduTypes(int, Enum): DEFAULT_PDU_TIMEOUT = 5 + +# MIBUpdater rate: Interval/Execution time +UPDATE_FREQUENCY_RATE = 10 +# MIBUpdater max update interval +MAX_UPDATE_INTERVAL = 60 diff --git a/src/ax_interface/mib.py b/src/ax_interface/mib.py index 43bf9b478..5cfd39e2c 100644 --- a/src/ax_interface/mib.py +++ b/src/ax_interface/mib.py @@ -1,10 +1,12 @@ import asyncio import bisect import random +from datetime import datetime from . import logger, util from .constants import ValueType from .encodings import ValueRepresentation +from .util import get_next_update_interval """ Update interval between update runs (in seconds). @@ -16,6 +18,11 @@ """ DEFAULT_REINIT_RATE = 60 +""" +Disable dynamic frequency by default +""" +DEFAULT_ENABLE_DYNAMIC_FREQUENCY = False + class MIBUpdater: """ @@ -25,6 +32,7 @@ class MIBUpdater: def __init__(self): self.run_event = asyncio.Event() self.frequency = DEFAULT_UPDATE_FREQUENCY + self.enable_dynamic_frequency = DEFAULT_ENABLE_DYNAMIC_FREQUENCY self.reinit_rate = DEFAULT_REINIT_RATE // DEFAULT_UPDATE_FREQUENCY self.update_counter = self.reinit_rate + 1 # reinit_data when init @@ -32,6 +40,7 @@ async def start(self): # Run the update while we are allowed redis_exception_happen = False while self.run_event.is_set(): + start = datetime.now() try: # reinit internal structures if self.update_counter > self.reinit_rate: @@ -57,9 +66,36 @@ async def start(self): # Any unexpected exception or error, log it and keep running logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()") + if self.enable_dynamic_frequency: + """ + On SONiC device with huge interfaces + for example RP card on ethernet chassis, including backend ports, 600+ interfaces + The update_data function could be very slow, especially when 100% CPU utilization. + for example ciscoSwitchQosMIB.QueueStatUpdater, uses 1-3 seconds on normal state. + uses 3-8 seconds on 100% CPU utilization state. + We use Asyncio/Coroutine as the basic framework, + the mib updaters share the same asyncio event loop with the SNMP agent client. + Hence during the updaters executing, the agent client can't receive/respond to new requests, + + The high frequency and the long execution time + causes the SNMP request to be timed out on High CPU utilization. + The stable frequency(generally with default value 5s) + doesn't works well on this huge interfaces situation. + when the execution time is long, + wait for longer time to give back the control of asyncio event loop to SNMP agent + """ + execution_time = (datetime.now() - start).total_seconds() + next_frequency = get_next_update_interval(execution_time, self.frequency) + + if next_frequency > self.frequency: + logger.debug(f"MIBUpdater type[{type(self)}] slow update detected, " + f"update execution time[{execution_time}], next_frequency[{next_frequency}]") + else: + next_frequency = self.frequency + # wait based on our update frequency before executing again. # randomize to avoid concurrent update storms. - await asyncio.sleep(self.frequency + random.randint(-2, 2)) + await asyncio.sleep(next_frequency + random.randint(-2, 2)) def reinit_data(self): """ @@ -275,10 +311,13 @@ class MIBTable(dict): Simplistic LUT for Get/GetNext OID. Interprets iterables as keys and implements the same interfaces as dict's. """ - def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY): + def __init__(self, mib_cls, + enable_dynamic_frequency=DEFAULT_ENABLE_DYNAMIC_FREQUENCY, + update_frequency=DEFAULT_UPDATE_FREQUENCY): if type(mib_cls) is not MIBMeta: raise ValueError("Supplied object is not a MIB class instance.") super().__init__(getattr(mib_cls, MIBMeta.KEYSTORE)) + self.enable_dynamic_frequency = enable_dynamic_frequency self.update_frequency = update_frequency self.updater_instances = getattr(mib_cls, MIBMeta.UPDATERS) self.prefixes = getattr(mib_cls, MIBMeta.PREFIXES) @@ -296,6 +335,7 @@ def start_background_tasks(self, event): tasks = [] for updater in self.updater_instances: updater.frequency = self.update_frequency + updater.enable_dynamic_frequency = self.enable_dynamic_frequency updater.run_event = event fut = asyncio.ensure_future(updater.start()) fut.add_done_callback(MIBTable._done_background_task_callback) diff --git a/src/ax_interface/util.py b/src/ax_interface/util.py index afe369115..96cf0cc8c 100644 --- a/src/ax_interface/util.py +++ b/src/ax_interface/util.py @@ -1,4 +1,5 @@ import ipaddress +import math import re from ax_interface import constants @@ -108,3 +109,31 @@ def ip2byte_tuple(ip): """ return tuple(i for i in ipaddress.ip_address(ip).packed) + +def get_next_update_interval(execution_time, static_frequency): + """ + >>> get_next_update_interval(0.4, 5) + 5 + >>> get_next_update_interval(0.87, 5) + 9 + >>> get_next_update_interval(18.88, 5) + 60 + + + :param static_frequency: Static frequency, generally use default value 5 + :param execution_time: The execution time of the updater + :return: the interval before next update + + We expect the rate of 'update interval'/'update execution time' >= UPDATE_FREQUENCY_RATE(10) + Because we're using asyncio/Coroutines, the update execution blocks SNMP proxy service and other updaters. + Generally we expect the update to be quick and the execution time/interval time < 0.25 + Given the static_frequency == 5, + if the execution_time < 0.5, + the update interval is(for example) 1.1s + It sleeps 1.1s * 10 = 11s before run next update + + """ + frequency_based_on_execution_time = math.ceil(execution_time * constants.UPDATE_FREQUENCY_RATE) + frequency_based_on_execution_time = min(frequency_based_on_execution_time, constants.MAX_UPDATE_INTERVAL) + + return max(static_frequency, frequency_based_on_execution_time) diff --git a/src/sonic_ax_impl/__main__.py b/src/sonic_ax_impl/__main__.py index d9f117db0..a4b24bab4 100644 --- a/src/sonic_ax_impl/__main__.py +++ b/src/sonic_ax_impl/__main__.py @@ -5,11 +5,12 @@ import shutil import sys -import sonic_py_common.util from swsscommon import swsscommon import ax_interface import sonic_ax_impl + +from .utils.arg_parser import process_options from . import mibs LOG_FORMAT = "snmp-subagent [%(name)s] %(levelname)s: %(message)s" @@ -70,7 +71,7 @@ def logging_level_to_swss_level(log_level): sys.exit(0) # import command line arguments - args = sonic_py_common.util.process_options("sonic_ax_impl") + args = process_options("sonic_ax_impl") # configure logging. If debug '-d' is specified, logs to stdout at designated level. syslog/INFO otherwise. log_level = log_level_sdk = args.get('log_level') @@ -110,4 +111,4 @@ def logging_level_to_swss_level(log_level): from .main import main - main(update_frequency=args.get('update_frequency')) + main(enable_dynamic_frequency=args.get('enable_dynamic_frequency'), update_frequency=args.get('update_frequency')) diff --git a/src/sonic_ax_impl/main.py b/src/sonic_ax_impl/main.py index 281ab601b..c798ab690 100644 --- a/src/sonic_ax_impl/main.py +++ b/src/sonic_ax_impl/main.py @@ -54,14 +54,14 @@ def shutdown(signame, agent): shutdown_task = event_loop.create_task(agent.shutdown()) -def main(update_frequency=None): +def main(enable_dynamic_frequency=False, update_frequency=None): global event_loop try: Namespace.init_sonic_db_config() # initialize handler and set update frequency (or use the default) - agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop) + agent = ax_interface.Agent(SonicMIB, enable_dynamic_frequency, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop) # add "shutdown" signal handlers # https://docs.python.org/3.5/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm diff --git a/src/sonic_ax_impl/utils/__init__.py b/src/sonic_ax_impl/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/sonic_ax_impl/utils/arg_parser.py b/src/sonic_ax_impl/utils/arg_parser.py new file mode 100644 index 000000000..788c4b2f9 --- /dev/null +++ b/src/sonic_ax_impl/utils/arg_parser.py @@ -0,0 +1,39 @@ +from __future__ import print_function +import sys +from getopt import getopt + + +def usage(script_name): + print('Usage: python ', script_name, + '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + +def process_options(script_name): + """ + Process command line options + """ + options, remainders = getopt(sys.argv[1:], "t:p:s:d:f:rh", ["host=", "port=", "unix_socket_path=", "debug=", "frequency=", "enable_dynamic_frequency", "help"]) + + args = {} + for (opt, arg) in options: + try: + if opt in ('-d', '--debug'): + args['log_level'] = int(arg) + elif opt in ('-t', '--host'): + args['host'] = arg + elif opt in ('-p', '--port'): + args['port'] = int(arg) + elif opt in ('-s', '--unix_socket_path'): + args['unix_socket_path'] = arg + elif opt in ('-f', '--frequency'): + args['update_frequency'] = int(arg) + elif opt in ('-r', '--enable_dynamic_frequency'): + args['enable_dynamic_frequency'] = True + elif opt in ('-h', '--help'): + usage(script_name) + sys.exit(0) + except ValueError as e: + print('Invalid option for {}: {}'.format(opt, e)) + sys.exit(1) + + return args diff --git a/tests/test_agent.py b/tests/test_agent.py index 6b16e0cd1..92e5ff7b7 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -22,6 +22,6 @@ async def delayed_shutdown(self, agent): def test_agent_loop(self): event_loop = asyncio.get_event_loop() - agent = ax_interface.Agent(SonicMIB, 5, event_loop) + agent = ax_interface.Agent(SonicMIB, False, 5, event_loop) event_loop.create_task(self.delayed_shutdown(agent)) event_loop.run_until_complete(agent.run_in_event_loop()) diff --git a/tests/test_arg_parser.py b/tests/test_arg_parser.py new file mode 100644 index 000000000..fc2f9033f --- /dev/null +++ b/tests/test_arg_parser.py @@ -0,0 +1,152 @@ +from unittest import TestCase +from unittest.mock import patch + +import pytest +from sonic_ax_impl.utils.arg_parser import process_options + + +class TestUtil(TestCase): + + # Given: Don't pass any parameter + # When: Parse args + # Then: Return empty dict + @patch('sys.argv', ['sonic_ax_impl']) + def test_valid_options_default_value_none(self): + args = process_options("sonic_ax_impl") + + self.assertNotIn("log_level", args) + self.assertNotIn("host", args) + self.assertNotIn("port", args) + self.assertNotIn("unix_socket_path", args) + self.assertNotIn("update_frequency", args) + self.assertNotIn("enable_dynamic_frequency", args) + + # Given: Pass --port=aaa + # When: Parse args + # Then: Print valure error + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '--port=aaa']) + def test_valid_options_value_error(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 1 + mock_print.assert_called_with("Invalid option for --port: invalid literal for int() with base 10: 'aaa'") + + # Given: Pass -h + # When: Parse args + # Then: Print help logs + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '-h']) + def test_valid_options_help(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 0 + mock_print.assert_called_with('Usage: python ', 'sonic_ax_impl', '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + # Given: Pass help + # When: Parse args + # Then: Print help logs + @patch('builtins.print') + @patch('sys.argv', ['sonic_ax_impl', '--help']) + def test_valid_options_help_long(self, mock_print): + with pytest.raises(SystemExit) as excinfo: + process_options("sonic_ax_impl") + assert excinfo.value.code == 0 + mock_print.assert_called_with('Usage: python ', 'sonic_ax_impl', '-t [host] -p [port] -s [unix_socket_path] -d [logging_level] -f [update_frequency] -r [enable_dynamic_frequency] -h [help]') + + # Given: Pass -r + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '-r']) + def test_valid_options_enable_dynamic_frequency(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["enable_dynamic_frequency"], True) + + # Given: Pass --enable_dynamic_frequency + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '--enable_dynamic_frequency']) + def test_valid_options_enable_dynamic_frequency_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["enable_dynamic_frequency"], True) + + # Given: Pass -f + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '-f9']) + def test_valid_options_update_frequency(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["update_frequency"], 9) + + # Given: Pass --frequency + # When: Parse args + # Then: Enable enable_dynamic_frequency + @patch('sys.argv', ['sonic_ax_impl', '--frequency=9']) + def test_valid_options_update_frequency_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["update_frequency"], 9) + + # Given: Pass -s + # When: Parse args + # Then: Parse socket + @patch('sys.argv', ['sonic_ax_impl', '-s/unix/socket']) + def test_valid_options_socket(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["unix_socket_path"], "/unix/socket") + + # Given: Pass --unix_socket_path + # When: Parse args + # Then: Parse socket + @patch('sys.argv', ['sonic_ax_impl', '--unix_socket_path=/unix/socket']) + def test_valid_options_socket_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["unix_socket_path"], "/unix/socket") + + # Given: Pass -p + # When: Parse args + # Then: Parse port + @patch('sys.argv', ['sonic_ax_impl', '-p6666']) + def test_valid_options_port(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["port"], 6666) + + # Given: Pass --port + # When: Parse args + # Then: Parse port + @patch('sys.argv', ['sonic_ax_impl', '--port=6666']) + def test_valid_options_port_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["port"], 6666) + + # Given: Pass -t + # When: Parse args + # Then: Parse host + @patch('sys.argv', ['sonic_ax_impl', '-tsnmp.com']) + def test_valid_options_host(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["host"], 'snmp.com') + + # Given: Pass --host + # When: Parse args + # Then: Parse host + @patch('sys.argv', ['sonic_ax_impl', '--host=snmp.com']) + def test_valid_options_host_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["host"], 'snmp.com') + + # Given: Pass -d + # When: Parse args + # Then: Parse log_level + @patch('sys.argv', ['sonic_ax_impl', '-d9']) + def test_valid_options_host(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["log_level"], 9) + + # Given: Pass --debug + # When: Parse args + # Then: Parse log_level + @patch('sys.argv', ['sonic_ax_impl', '--debug=9']) + def test_valid_options_host_long(self): + args = process_options("sonic_ax_impl") + self.assertEqual(args["log_level"], 9) + diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 000000000..851b2a98f --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,54 @@ +from unittest import TestCase + +from ax_interface.util import get_next_update_interval + + +class TestUtil(TestCase): + # Given: Update is quick, execution time is 0.000001, static interval is 5/10/15s + # When: get next interval + # Then: Return default interval, 5/10/15s + def test_get_interval_quick_finish(self): + for static_interval in [5, 10, 15]: + self.assertEqual(get_next_update_interval(0.000001, static_interval), static_interval) + + # Given: Update is slow, execution time is 0.7666666, static interval is 5 + # When: get next interval + # Then: Return the ceil(0.766666 * 10) = 8 + def test_get_interval_slow_finish(self): + self.assertEqual(get_next_update_interval(0.766666, 5), 8) + + # Given: Update is slow, execution time is 0.766666, static interval is 10 + # When: get next interval + # Then: Return default interval, 10 + def test_get_interval_slow_finish_default_long(self): + self.assertEqual(get_next_update_interval(0.766666, 10), 10) + + # Given: Update is very slow, execution time is 20.2324, static interval is 10 + # When: get next interval + # Then: Return max interval, 60 + def test_get_interval_very_slow(self): + self.assertEqual(get_next_update_interval(20.2324, 10), 60) + + # Given: Get a 0 as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_zero(self): + self.assertEqual(get_next_update_interval(0, 5), 5) + + # Given: Get a 0.000000 as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_zero_long(self): + self.assertEqual(get_next_update_interval(0.000000, 5), 5) + + # Given: Wrongly get a negative number(-0.000001) as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_negative(self): + self.assertEqual(get_next_update_interval(-0.000001, 5), 5) + + # Given: Wrongly get a negative number(-10.000001) as the execution time, static interval is 5 + # When: get next interval + # Then: Return default interval, 5 + def test_get_interval_negative_slow(self): + self.assertEqual(get_next_update_interval(-10.000001, 5), 5)