Skip to content

Commit

Permalink
Adding Socket Statistics Script
Browse files Browse the repository at this point in the history
  • Loading branch information
bnerickson committed Sep 11, 2023
1 parent 2c6ae8b commit e2283a2
Showing 1 changed file with 225 additions and 0 deletions.
225 changes: 225 additions & 0 deletions snmp/ss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
#!/usr/bin/env python
#
# Name: Socket Statistics Script
# Author: bnerickson <[email protected]> w/SourceDoctor's certificate.py script forming
# the base of the vast majority of this one.
# Version: 1.0
# Description: This is a simple script to parse "ss" output for ingestion into
# LibreNMS via the ss application.
# Installation:
# 1. Copy this script to /etc/snmp/ and make it executable:
# chmod +x /etc/snmp/ss.py
# 2. Edit your snmpd.conf and include:
# extend ss /etc/snmp/ss.py
# 3. (Optional) Create a /etc/snmp/ss.json file and specify:
# a.) "ss_cmd" - String path to the ss binary: ["/sbin/ss"]
# b.) "socket_types" - A comma-delimited list of socket types to include.
# Specifying "all" includes all of the socket types.
# For example: to include only tcp and udp sockets,
# you would specify "tcp,udp": ["all"]
# ```
# {
# "ss_cmd": "/sbin/ss",
# "socket_types": "all"
# }
# ```
# 4. Restart snmpd and activate the app for desired host.

import json
import subprocess
import sys

CONFIG_FILE = "/etc/snmp/ss.json"
SOCKET_TYPES = {
"dccp": {"args": ["--dccp", "--all"], "netids_present": False},
"inet": {"args": ["--family", "inet", "--all"], "netids_present": True},
"inet6": {"args": ["--family", "inet6", "--all"], "netids_present": True},
"link": {"args": ["--family", "link", "--all"], "netids_present": True},
"mptcp": {"args": ["--mptcp", "--all"], "netids_present": False},
"netlink": {"args": ["--family", "netlink", "--all"], "netids_present": False},
"raw": {"args": ["--raw", "--all"], "netids_present": False},
"sctp": {"args": ["--sctp", "--all"], "netids_present": False},
"tcp": {"args": ["--tcp", "--all"], "netids_present": False},
"tipc": {"args": ["--family", "tipc", "--all"], "netids_present": True},
"udp": {"args": ["--udp", "--all"], "netids_present": False},
"unix": {"args": ["--family", "unix", "--all"], "netids_present": True},
"vsock": {"args": ["--family", "vsock", "--all"], "netids_present": True},
"xdp": {"args": ["--xdp", "--all"], "netids_present": False},
}
SOCKET_ALLOW_LIST = list(SOCKET_TYPES.keys())
SS_CMD = ["/sbin/ss"]


def error_handler(error_name, err):
"""
error_handler(): Common error handler for config/output parsing and
command execution.
Inputs:
error_name: String describing the error handled.
err: The error message in its entirety.
Outputs:
None
"""
output_data = {
"errorString": f"{error_name}: '{err}'",
"error": 1,
"version": 1,
"data": [],
}
print(json.dumps(output_data))
sys.exit(1)


def config_file_parser():
"""
config_file_parser(): Parses the config file (if it exists) and extracts the
necessary parameters.
Inputs:
None
Outputs:
ss_cmd: The full ss command to execute.
socket_allow_list: A list of the socket types to parse output for.
"""
ss_cmd = SS_CMD.copy()
socket_allow_list = SOCKET_ALLOW_LIST.copy()

# Load configuration file if it exists
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as json_file:
config_file = json.load(json_file)
ss_cmd = [config_file["ss_cmd"]]
lower_list = list(map(str.lower, config_file["socket_types"].split(",")))
if "all" not in lower_list:
socket_allow_list = lower_list
except FileNotFoundError:
pass
except (KeyError, PermissionError, OSError, json.decoder.JSONDecodeError) as err:
error_handler("Config File Error", err)

# Verify the socket types specified by the user are valid.
err = ""
for socket_type in socket_allow_list:
if socket_type not in SOCKET_TYPES:
if not err:
err = "Invalid socket types specified: "
err += socket_type + " "
if err:
error_handler("Configuration File Error", err.strip())

# Create and return full ss command.
return ss_cmd, socket_allow_list


def command_executor(ss_cmd, socket_type):
"""
command_executor(): Execute the ss command and return the output.
Inputs:
ss_cmd: The full ss command to execute.
socket_type: The type of socket to collect data for.
Outputs:
poutput: The stdout of the executed command (empty byte-string if error).
"""
ss_socket_cmd = ss_cmd.copy()
ss_socket_cmd.extend(SOCKET_TYPES[socket_type]["args"])

try:
# Execute ss command
poutput = subprocess.check_output(
ss_socket_cmd,
stdin=None,
stderr=subprocess.PIPE,
)
except (subprocess.CalledProcessError, OSError) as err:
error_handler("Command Execution Error", err)
return poutput


def socket_parser(line, socket_type, ss_data):
"""
socket_parser(): Parses a socket line for its current status.
That status type is added to the global ss_data
variable if it does not exist or incremented if
it does. The totals for the socket type are
incremented as well.
Inputs:
line: The sockets's status line from the ss stdout.
socket_type: The type of socket to parse data for.
ss_data: All of the socket data as a dictionary.
Outputs:
None
"""
line_parsed = line.strip().split()

netid = None
state = None

try:
if SOCKET_TYPES[socket_type]["netids_present"]:
netid = line_parsed[0]
state = line_parsed[1]
else:
state = line_parsed[0]
except IndexError as err:
error_handler("Command Output Parsing Error", err)

if SOCKET_TYPES[socket_type]["netids_present"]:
if netid == "???":
netid = "unknown"
if netid not in ss_data:
ss_data[netid] = {}
ss_data[netid][state] = (
1 if state not in ss_data[netid] else (ss_data[netid][state] + 1)
)
ss_data[netid]["TOTAL"] = (
1 if "TOTAL" not in ss_data[netid] else (ss_data[netid]["TOTAL"] + 1)
)
else:
ss_data[state] = 1 if state not in ss_data else (ss_data[state] + 1)
ss_data["TOTAL"] = 1 if "TOTAL" not in ss_data else (ss_data["TOTAL"] + 1)

return ss_data


def main():
"""
main(): main function that delegates config file parsing, command execution,
and socket stdout parsing. Then it prints out the expected json output
for the ss application.
Inputs:
None
Outputs:
None
"""
output_data = {"errorString": "", "error": 0, "version": 1, "data": {}}

# Parse configuration file.
ss_cmd, socket_allow_list = config_file_parser()

# Execute ss command for socket types.
for socket_type in SOCKET_ALLOW_LIST:
# Skip socket types disabled by the user.
if socket_type not in socket_allow_list:
continue

loop_first = True
for line in command_executor(ss_cmd, socket_type).decode("utf-8").split("\n"):
# Skip the first header line.
if loop_first:
output_data["data"][socket_type] = {}
loop_first = False
continue
if not line:
continue
output_data["data"][socket_type] = socket_parser(
line, socket_type, output_data["data"][socket_type]
)

print(json.dumps(output_data))


if __name__ == "__main__":
main()

0 comments on commit e2283a2

Please sign in to comment.