-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
313 lines (293 loc) · 10.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/python
"""
Python script to update an Atlassian Confluence page with the secrets found
based on the report generated by the custom shell script 'gitleaks.sh' which
is using Gitleaks and to send a notification on Slack.
"""
# required imports
import json
import os
import re
import sys
from datetime import datetime
import pytz
import requests
from atlassian import Confluence
print("Script Execution Started!")
# get time zone, repository name and branch name from the arguments passed to the script
if len(sys.argv) < 4 or len(sys.argv) > 5:
print("ERROR: Invalid arguments passed.")
print("Usage: python main.py TIME_ZONE REPOSITORY_NAME BRANCH_NAME [JSON_REPORT_URL]")
print("Example: python main.py Europe/Amsterdam my-projects/my-repo master")
sys.exit(1)
time_zone = sys.argv[1]
repo_name = sys.argv[2]
branch_name = sys.argv[3]
json_report_url = ""
if len(sys.argv) == 5:
json_report_url = sys.argv[4]
# get the current time in UTC and convert it into the desired time zone's time
time_now = datetime.now()
target_timezone = pytz.timezone(time_zone)
time_now = time_now.astimezone(target_timezone)
time_now = time_now.strftime('%Y-%m-%d %H:%M:%S %Z')
# get environment variables related to Confluence
confluence_enabled = os.getenv("CONFLUENCE_ENABLED")
if confluence_enabled is None:
print("ERROR: CONFLUENCE_ENABLED environment variable is not set.")
sys.exit(1)
elif confluence_enabled == "1":
confluence_site = os.getenv("CONFLUENCE_SITE")
confluence_user = os.getenv("CONFLUENCE_USER_EMAIL_ID")
confluence_pass = os.getenv("CONFLUENCE_USER_TOKEN")
page_title = os.getenv("CONFLUENCE_PAGE_TITLE")
page_space = os.getenv("CONFLUENCE_PAGE_SPACE")
if confluence_site is None:
print("ERROR: CONFLUENCE_SITE environment variable is not set.")
sys.exit(1)
if confluence_user is None:
print("ERROR: CONFLUENCE_USER_EMAIL_ID environment variable is not set.")
sys.exit(1)
if confluence_pass is None:
print("ERROR: CONFLUENCE_USER_TOKEN environment variable is not set.")
sys.exit(1)
if page_title is None:
print("ERROR: CONFLUENCE_PAGE_TITLE environment variable is not set.")
sys.exit(1)
if page_space is None:
print("ERROR: CONFLUENCE_PAGE_SPACE environment variable is not set.")
sys.exit(1)
# get environment variables related to Slack
slack_enabled = os.getenv("SLACK_ENABLED")
if slack_enabled is None:
print("ERROR: SLACK_ENABLED environment variable is not set.")
sys.exit(1)
elif slack_enabled == "1":
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if slack_webhook_url is None:
print("ERROR: SLACK_WEBHOOK_URL environment variable is not set.")
sys.exit(1)
# define HTML page template
if confluence_enabled == "1":
html_template = """
<h2>Repository: {} - Branch: {}</h2>
<h4>Last Scan Time: {}</h4>
<h3>Secrets Found: {}</h3>
<table data-number-column="true" data-table-width="1400" data-layout="default">
<tbody>
<tr>
<th class="numberingColumn"/>
<th>
<p><strong>Description</strong></p>
</th>
<th>
<p><strong>File Reference</strong></p>
</th>
<th>
<p><strong>Secret Type</strong></p>
</th>
<th>
<p><strong>Commit ID</strong></p>
</th>
<th>
<p><strong>Commit Author</strong></p>
</th>
</tr>
{}
</tbody>
</table>
"""
# define HTML row template
if confluence_enabled == "1":
row_template = """
<tr>
<td class="numberingColumn">
{}
</td>
<td>
<p>{}</p>
</td>
<td>
<p>{}</p>
</td>
<td>
<p>{}</p>
</td>
<td>
<p>{}</p>
</td>
<td>
<p>{}</p>
</td>
</tr>
"""
# connect to Atlassian Confluence
if confluence_enabled == "1":
print("Connecting to Atlassian Confluence...")
confluence = Confluence(
url=confluence_site,
username=confluence_user,
password=confluence_pass
)
# resolve the page ID on Confluence
if confluence_enabled == "1":
print("Resolving the page ID on Confluence...")
page_id = confluence.get_page_id(
page_space,
page_title
)
# fetch current page content from the page on Confluence
if confluence_enabled == "1":
print("Fetching current page content from the page on Confluence...")
page = confluence.get_page_by_id(
page_id,
expand='body.storage'
)
page_content = page['body']['storage']['value']
# read JSON from the report generated by the custom shell script 'gitleaks.sh'
print("Reading JSON from the report generated by the custom shell script 'gitleaks.sh'...")
with open("./gitleaks-report.json", "r", encoding='UTF-8') as file:
data = json.load(file)
secrets_count = len(data)
# update HTML page template and find unique commit authors from the JSON data read
if confluence_enabled == "1" or slack_enabled == "1":
print("Updating HTML page template and finding unique commit authors from the JSON data read...")
authors = []
rows = ""
rows_count = 1
for entry in data:
author = entry["Author"]
if confluence_enabled == "1":
description = entry["Description"]
file_reference = f'<a href="{entry["Link"]}">{entry["File"]}:{entry["Line No."]}</a>'
secret_type = entry["Secret Type"]
commit = entry["Commit"]
rows += row_template.format(
rows_count,
description,
file_reference,
secret_type,
commit,
author
)
if slack_enabled == "1":
authors.append(author)
rows_count = rows_count + 1
if confluence_enabled == "1":
html_template = html_template.format(
repo_name,
branch_name,
time_now,
secrets_count,
rows
)
if slack_enabled == "1":
authors = list(set(authors))
authors.sort()
# define the pattern to replace the respective div using repository and branch name
if confluence_enabled == "1":
print("Defining the pattern to replace the respective div using repository and branch name...")
pattern = f'<h2>Repository: {repo_name} - Branch: {branch_name}.*?</table>'
# check if pattern is found or not and update the page content accordingly
if confluence_enabled == "1":
print("Checking if pattern is found or not and updating the page content accordingly...")
new_page_content = page_content
re_result = re.search(
pattern,
page_content,
flags=re.DOTALL
)
if re_result:
# pattern found; replace the existing content with the new HTML page template
new_page_content = re.sub(
pattern,
html_template,
page_content,
flags=re.DOTALL
)
else:
# pattern not found; add the new HTML page template at the end of the page
new_page_content = new_page_content + "\n" + html_template
# update page with new content
if confluence_enabled == "1":
try:
confluence.update_page(
page_id, page_title,
new_page_content,
type='page',
representation='storage',
minor_edit=False,
full_width=True
)
print("Confluence page is updated successfully.")
except Exception as err:
print("ERROR: Failed to update Confluence page.")
print(f'ERROR: {err}')
sys.exit(1)
# prepare a message to send it as a notification to Slack
if slack_enabled == "1":
print("Preparing a message to send it as a notification to Slack...")
message_color = "#02cc38" # green
message_title = f':tada: No Secrets found in the :file_folder: "{repo_name}" repository'
message_title += f' on the :git: "{branch_name}" branch.'
message = f'\n:clock1: *Last Scan Time:* `{time_now}`'
slack_message = {
"attachments": [
{
"pretext": "*Secrets Detection Notification*",
"title": message_title,
"text": message,
"color": message_color
}
]
}
message_links = ""
if secrets_count != 0:
message_color = "#cc0202" # red
message_title = f':warning: {secrets_count} Secrets found in the :file_folder:'
message_title += f' "{repo_name}" repository on the :git: "{branch_name}" branch.'
message += f'\n:technologist: *Commit Authors:* \n• *{'*\n• *'.join(authors)}*'
slack_message = {
"attachments": [
{
"pretext": "*Secrets Detection Notification*",
"title": message_title,
"text": message,
"color": message_color
}
]
}
if confluence_enabled == "1":
message_links += f'\n:link: More details can be found here: <{confluence_site}/wiki/'
message_links += f'spaces/{page_space}/pages/{page_id}/{page_title}|Confluence Page>'
if json_report_url != "":
message_links += f'\n:link: JSON report can be found here: <{json_report_url}'
message_links += '|JSON Report>'
if message_links != "":
slack_message_links = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": message_links
}
}
]
}
slack_message["attachments"].append(slack_message_links)
# send the prepared message as a notification to Slack
print("Sending the prepared message as a notification to Slack...")
try:
response = requests.post(
slack_webhook_url,
data=json.dumps(slack_message),
headers={'Content-Type': "application/json"},
timeout=10)
response.raise_for_status()
print("Notification sent to Slack successfully.")
except Exception as err:
print("ERROR: Failed to send notification to Slack.")
print(f'ERROR: {err}')
sys.exit(1)
print("Script Execution Completed!")