-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcisco_snmp_changer.py
200 lines (167 loc) · 9.02 KB
/
cisco_snmp_changer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import concurrent.futures
from netmiko import ConnectHandler
import netmiko
import logging
import subprocess
#################################################################################################
# Configure logging to write to a file
logging.basicConfig(filename='script_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
#################################################################################################
# Banner Intro
def banner():
print(
"""
_____ _ ____ _______ ________ _____ _ __________________
/ ___// | / / |/ / __ \ / ____/ / / / | / | / / ____/ ____/ __ \\
\__ \/ |/ / /|_/ / /_/ / / / / /_/ / /| | / |/ / / __/ __/ / /_/ /
___/ / /| / / / / ____/ / /___/ __ / ___ |/ /| / /_/ / /___/ _, _/
/____/_/ |_/_/ /_/_/ \____/_/ /_/_/ |_/_/ |_/\____/_____/_/ |_|
"""
)
#################################################################################################
def ping_device(ip):
"""Check if device is reachable."""
try:
ping_response = subprocess.run(['ping', '-n', '2', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return ping_response.returncode == 0 # Return True if ping succeeds, False otherwise
except Exception as e:
logging.error(f"Error during ping check: {str(e)}")
return False
#################################################################################################
def check_reachability(ip, username, password, device_type):
# Check if the device is reachable using ping
if ping_device(ip):
logging.info(f"Device {ip} is reachable via ping.")
return True
# If ping fails, try SSH reachability
try:
logging.info(f"Ping failed. Attempting SSH reachability to {ip}")
ssh_device = {
'device_type': device_type,
'ip': ip,
'username': username,
'password': password,
}
with ConnectHandler(**ssh_device) as ssh_conn:
logging.info(f"SSH reachability to {ip} successful.")
return True
except Exception as e:
logging.error(f"SSH reachability to {ip} failed: {str(e)}")
return False
#################################################################################################
def get_device_info(device_line):
"""Extracts device info from a line in the Devices.txt file."""
# Check if the line contains the expected number of values
if ':' not in device_line or device_line.count(':') != 4:
logging.warning(f"Skipping invalid line in Devices.txt: {device_line}")
return None
try:
hostname, device_type, username, password, enable_password = device_line.strip().split(':')
return {
'device_type': device_type,
'ip': hostname,
'username': username,
'password': password,
'secret': enable_password
}
except ValueError:
logging.warning(f"Skipping line with unexpected format in Devices.txt: {device_line}")
return None
#################################################################################################
def modify_snmp_config(device_info):
try:
"""Modify SNMP community strings."""
# Check if the device is reachable
if not check_reachability(device_info['ip'], device_info['username'], device_info['password'], device_info['device_type']):
logging.error(f"Device {device_info['ip']} is not reachable.")
return f"Device {device_info['ip']} is not reachable."
print(f"Connecting to {device_info['ip']}")
connection = ConnectHandler(**device_info)
if device_info['device_type'] == 'cisco_ios':
# Enter enable mode
print(f"Entering Enable Mode in {device_info['ip']}")
connection.enable()
# Check if enable mode has been entered successfully
if not connection.check_enable_mode():
logging.error(f"Failed to enter enable mode on {device_info['ip']}")
return f"Failed to enter enable mode on {device_info['ip']}"
# Check for old_snmp_ro
snmp_output_old_snmp_ro = connection.send_command("sh run | inc old_snmp_ro")
logging.info(f"Searching for SNMP RO string' from {device_info['ip']}:{snmp_output_old_snmp_ro}")
# Check for old_snmp_rw
snmp_output_old_snmp_rw = connection.send_command("sh run | inc old_snmp_rw")
logging.info(f"Searching for SNMP RW string' from {device_info['ip']}:{snmp_output_old_snmp_rw}")
lines_to_remove = [line for line in snmp_output_old_snmp_ro.splitlines() if 'old_snmp_ro' in line] + \
[line for line in snmp_output_old_snmp_rw.splitlines() if 'old_snmp_rw' in line]
if lines_to_remove:
# Enter configuration mode
connection.config_mode()
# Remove old config lines
for line in lines_to_remove:
print(f"Sending command to {device_info['ip']}: no {line}")
connection.send_config_set(f"no {line}", exit_config_mode=False)
logging.info(f"Removing old SNMP string from {device_info['ip']}")
if device_info['device_type'] == 'cisco_ios':
# Save configuration after removing old config
print(f"Sending command to {device_info['ip']}: wr mem")
connection.send_command("wr mem")
logging.info(f"Saving configuration on {device_info['ip']}")
# Re-enter configuration mode for new commands
connection.config_mode()
elif device_info['device_type'] == 'cisco_xr':
# Commit changes after removing old config
print(f"Sending command to {device_info['ip']}: commit")
connection.send_command("commit")
logging.info(f"Saving configuration on {device_info['ip']}")
# Add new config lines
for line in lines_to_remove:
new_line = line.replace('old_snmp_ro', 'NEW1RO').replace('old_snmp_rw', 'NEW2RW')
print(f"Sending command to {device_info['ip']}: {new_line}")
connection.send_config_set(new_line, exit_config_mode=False)
logging.info(f"Configuring new SNMP strings on {device_info['ip']}:{new_line}")
# Save configuration after adding new config
if device_info['device_type'] == 'cisco_ios':
print(f"Sending command to {device_info['ip']}: wr mem")
connection.send_command("wr mem")
logging.info(f"Saving configuration on {device_info['ip']}")
elif device_info['device_type'] == 'cisco_xr':
print(f"Sending command to {device_info['ip']}: commit")
connection.send_command("commit")
logging.info(f"Saving configuration on {device_info['ip']}")
result = f"Configuration modified and saved for: {device_info['ip']}"
else:
result = f"No old SNMP strings found for: {device_info['ip']}"
connection.disconnect()
return result
except netmiko.NetmikoTimeoutException:
logging.error(f"Timeout connecting to {device_info['ip']}")
return f"Timeout connecting to {device_info['ip']}"
except netmiko.NetmikoAuthenticationException:
logging.error(f"Authentication failed for {device_info['ip']}")
return f"Authentication failed for {device_info['ip']}"
except Exception as e:
logging.error(f"Error processing {device_info['ip']}: {str(e)}")
return f"Error processing {device_info['ip']}: {str(e)}"
#################################################################################################
def main():
with open('Devices.txt', 'r') as f:
# Exclude commented-out lines and blank lines
device_lines = [line for line in f.readlines() if not line.strip().startswith('#') and line.strip()]
# Get the device information
devices = [get_device_info(line) for line in device_lines if get_device_info(line)]
if not devices:
logging.warning("No valid devices found in Devices.txt. Exiting script.")
return
# ThreadPoolExecutor is used to send the commands to devices concurrently
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(devices)) as executor:
results = list(executor.map(modify_snmp_config, devices))
for result in results:
print(result)
except Exception as e:
logging.error(f"Error in thread execution: {str(e)}")
print(f"Error in thread execution: {str(e)}")
#################################################################################################
if __name__ == '__main__':
banner()
main()