-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbase_parser.py
345 lines (278 loc) · 13.4 KB
/
base_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# This file contains base functionality that requires a proxy to be running.
# This file is the base class for custom parsers
# will be default in python 3.11.
# This is required for there to be no errors in the type hints.
from __future__ import annotations
import typing
# struct is used to decode bytes into primitive data types
# https://docs.python.org/3/library/struct.html
import os
from enum import Enum, auto
import time
# pylint: disable=redefined-builtin
from prompt_toolkit import print_formatted_text as print
# This is the base class for the base parser
import core_parser
from enum_socket_role import ESocketRole
from hexdump import Hexdump
# For type hints only
if typing.TYPE_CHECKING:
from proxy import Proxy
from application import Application
from core_parser import CommandDictType
from buffer_status import BufferStatus
###############################################################################
# Define which settings are available here.
class EBaseSettingKey(Enum):
HEXDUMP_ENABLED = auto()
HEXDUMP = auto()
PACKETNOTIFICATION_ENABLED = auto()
PACKET_NUMBER = auto()
def __eq__(self, other: typing.Any) -> bool:
if other is int:
return self.value == other
if other is str:
return self.name == other
if repr(type(self)) == repr(type(other)):
return self.value == other.value
return False
def __gt__(self, other: typing.Any) -> bool:
if other is int:
return self.value > other
if other is str:
return self.name > other
if repr(type(self)) == repr(type(other)):
return self.value > other.value
raise ValueError('Can not compare.')
def __hash__(self):
return self.value.__hash__()
class Parser(core_parser.Parser):
def __init__(self, application: Application, settings: dict[(Enum, typing.Any)]):
super().__init__(application, settings)
return
def __str__(self) -> str:
# The base parser doesn't forward packets, so using it would drop them all.
return 'BASE/DROP'
# Return a list of setting keys. Make sure to also include the base classes keys.
def getSettingKeys(self) -> list[Enum]:
settingKeys = super().getSettingKeys()
settingKeys.extend(list(EBaseSettingKey))
return settingKeys
# Define the defaults for each setting here.
def getDefaultSettings(self) -> dict[(Enum, typing.Any)]:
defaultSettings = {
EBaseSettingKey.HEXDUMP_ENABLED: True,
EBaseSettingKey.HEXDUMP: Hexdump(),
EBaseSettingKey.PACKETNOTIFICATION_ENABLED: True,
EBaseSettingKey.PACKET_NUMBER: -1
}
# Return the base class defaults as well
return super().getDefaultSettings() | defaultSettings
###############################################################################
# Packet parsing stuff goes here.
# Define what should happen when a packet arrives here
def parse(self, data: bytes, proxy: Proxy, origin: ESocketRole) -> list[str]:
output = []
# Update packet number
pktNr = self.getSetting(EBaseSettingKey.PACKET_NUMBER) + 1
self.setSetting(EBaseSettingKey.PACKET_NUMBER, pktNr)
# Output a packet notification if enabled.
if self.getSetting(EBaseSettingKey.PACKETNOTIFICATION_ENABLED):
# Print out the data in a nice format.
ts = time.time() - self.application.START_TIME
tsStr = f'{ts:>14.8f}'
proxyStr = f'{proxy.name} ({self.application.getParserByProxy(proxy)})'
pktNrStr = f'[PKT# {pktNr}]'
directionStr = '[C -> S]' if origin == ESocketRole.CLIENT else '[C <- S]'
dataLenStr = f'{len(data)} Byte{"s" if len(data) > 1 else ""}'
# Colorize output.
tsStr = self.application.escapeHTML(tsStr)
tsStr = f'<cyan>{tsStr}</cyan>'
proxyStr = self.application.escapeHTML(proxyStr)
proxyStr = f'<green><b>{proxyStr}</b></green>'
pktNrStr = self.application.escapeHTML(pktNrStr)
pktNrStr = f'<yellow>{pktNrStr}</yellow>'
directionStr = self.application.escapeHTML(directionStr)
if origin == ESocketRole.CLIENT:
directionStr = f'<style fg="white" bg="blue"><b>{directionStr}</b></style>'
else:
directionStr = f'<style fg="white" bg="magenta"><b>{directionStr}</b></style>'
dataLenStr = self.application.escapeHTML(dataLenStr)
dataLenStr = f'<green><b>{dataLenStr}</b></green>'
# Put it all together.
output.append(f'{tsStr} - {proxyStr} {pktNrStr} {directionStr} - {dataLenStr}')
# Output a hexdump if enabled.
if self.getSetting(EBaseSettingKey.HEXDUMP_ENABLED):
hexdumpObj = self.getSetting(EBaseSettingKey.HEXDUMP)
for line in hexdumpObj.hexdump(data):
output.append(line)
# Return the output.
return output
###############################################################################
# CLI stuff goes here.
# Define your custom commands here. Each command requires those arguments:
# 1. args: list[str]
# A list of command arguments. args[0] is always the command string itself.
# 2. proxy: Proxy
# This allows to make calls to the proxy API, for example to inject packets.
# The functions should return 0 if they succeeded. Otherwise their return gets printed by the CLI handler.
# Define which commands are available here and which function is called when it is entered by the user.
# Return a dictionary with the command as the key and a tuple of (function, str, completerArray) as the value.
# The function is called when the command is executed, the string is the help text for that command.
# The last completer in the completer array will be
# used for all words if the word index is higher than the index in the completer array.
# If you don't want to provide more completions, use None at the end.
def _buildCommandDict(self) -> CommandDictType:
ret = super()._buildCommandDict()
sendHexNote = 'Usage: {0} <HexData> \nExample: {0} 41424344\nNote: Spaces in hex data are allowed and ignored.'
sendStringNote = 'Usage: {0} <String>\n' \
'Example: {0} hello\\!\n\n' \
'Note: Leading spaces in the string are sent except for the space between the command and\n' \
'the first character of the string.\n' \
'Escape sequences are available.'
sendFileNote = 'Usage: {0} filename\nExample: {0} /home/user/.bashrc'
ret['hexdump'] = (
self._cmd_hexdump,
'Configure the hexdump or show current configuration.\n'
'Usage: {0} [yes|no] [<BytesPerLine>] [<BytesPerGroup>]',
[self._yesNoCompleter, None, ]
)
ret['notify'] = (
self._cmd_notify,
'Configure packet notifications.\n'
'Usage: {0} [yes|no]\n'
'If argument omitted, this will toggle the notifications.',
[self._yesNoCompleter, None]
)
ret['h2s'] = (self._cmd_h2s, f'Send arbitrary hex values to the server.\n{sendHexNote}', None)
ret['s2s'] = (self._cmd_s2s, f'Send arbitrary strings to the server.\n{sendStringNote}', None)
ret['f2s'] = (
self._cmd_f2s,
f'Send arbitrary files to the server.\n{sendFileNote}',
[self._fileCompleter, None]
)
ret['h2c'] = (self._cmd_h2c, f'Send arbitrary hex values to the client.\n{sendHexNote}', None)
ret['s2c'] = (self._cmd_s2c, f'Send arbitrary strings to the client.\n{sendStringNote}', None)
ret['f2c'] = (
self._cmd_f2c,
f'Send arbitrary files to the client.\n{sendFileNote}',
[self._fileCompleter, None]
)
return ret
def _cmd_notify(self, args: list[str], _) -> typing.Union[int, str]:
if not len(args) in [1, 2]:
print(self.getHelpText(args[0]))
return 'Syntax error.'
newState = not self.getSetting(EBaseSettingKey.PACKETNOTIFICATION_ENABLED)
if len(args) == 2:
if args[1] == 'yes':
newState = True
elif args[1] == 'no':
newState = False
else:
print(self.getHelpText(args[0]))
return 'Syntax error.'
self.setSetting(EBaseSettingKey.PACKETNOTIFICATION_ENABLED, newState)
print(f'Packet notifications are now {"enabled" if newState else "disabled"}.')
return 0
def _cmd_h2s(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_hex(args, ESocketRole.SERVER, proxy)
def _cmd_h2c(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_hex(args, ESocketRole.CLIENT, proxy)
def _aux_cmd_send_hex(self, args: list[str], target: ESocketRole, proxy: Proxy) -> typing.Union[int, str]:
if len(args) == 1:
print(self.getHelpText(args[0]))
return 'Syntax error.'
# Allow spaces in hex string, so join with empty string to remove them.
userInput = ''.join(args[1:])
pkt = bytes.fromhex(userInput)
if proxy.getIsConnected():
proxy.sendData(target, pkt)
return 0
return 'Not connected.'
def _cmd_s2s(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_string(args, ESocketRole.SERVER, proxy)
def _cmd_s2c(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_string(args, ESocketRole.CLIENT, proxy)
def _aux_cmd_send_string(self, args: list[str], target: ESocketRole, proxy: Proxy) -> typing.Union[int, str]:
if len(args) == 1:
print(self.getHelpText(args[0]))
return 'Syntax error.'
userInput = ' '.join(args[1:])
pkt = str.encode(userInput, 'utf-8')
pkt = self._escape(pkt)
if proxy.getIsConnected():
proxy.sendData(target, pkt)
return 0
return 'Not connected.'
def _cmd_f2s(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_file(args, ESocketRole.SERVER, proxy)
def _cmd_f2c(self, args: list[str], proxy: Proxy) -> typing.Union[int, str]:
return self._aux_cmd_send_file(args, ESocketRole.CLIENT, proxy)
def _aux_cmd_send_file(self, args: list[str], target: ESocketRole, proxy: Proxy) -> typing.Union[int, str]:
if len(args) != 2:
print(self.getHelpText(args[0]))
return 'Syntax error.'
filePath = ' '.join(args[1:])
if not os.path.isfile(filePath):
return f'File "{filePath}" does not exist.'
byteArray = b''
try:
with open(filePath, 'rb') as file:
while byte := file.read(1):
byteArray += byte
# pylint: disable=broad-except
except Exception as e:
return f'Error reading file "{filePath}": {e}'
if proxy.getIsConnected():
proxy.sendData(target, byteArray)
return 0
return 'Not connected.'
def _cmd_hexdump(self, args: list[str], _) -> typing.Union[int, str]:
if len(args) > 4:
print(self.getHelpText(args[0]))
return 'Syntax error.'
enabled = self.getSetting(EBaseSettingKey.HEXDUMP_ENABLED)
hexdumpObj: Hexdump = self.getSetting(EBaseSettingKey.HEXDUMP)
bytesPerGroup = hexdumpObj.bytesPerGroup
bytesPerLine = hexdumpObj.bytesPerLine
if len(args) > 3:
try:
bytesPerGroup = self._strToInt(args[3])
except ValueError as e:
print(self.getHelpText(args[0]))
return f'Syntax error: {e}'
if len(args) > 2:
try:
bytesPerLine = self._strToInt(args[2])
if bytesPerLine < 1:
raise ValueError('Can\'t have less than 1 byte per line.')
except ValueError as e:
print(self.getHelpText(args[0]))
return f'Syntax error: {e}'
if len(args) > 1:
if args[1].lower() == 'yes':
enabled = True
elif args[1].lower() == 'no':
enabled = False
else:
print(self.getHelpText(args[0]))
return 'Syntax error: Must be "yes" or "no".'
# Write back settings
self.setSetting(EBaseSettingKey.HEXDUMP_ENABLED, enabled)
hexdumpObj.setBytesPerLine(bytesPerLine)
hexdumpObj.setBytesPerGroup(bytesPerGroup)
# Show status
if enabled:
print(f'Printing hexdumps: {hexdumpObj}')
else:
print('Not printing hexdumps.')
return 0
###############################################################################
# Completers go here.
def _yesNoCompleter(self, bufferStatus: BufferStatus) -> typing.NoReturn:
options = ['yes', 'no']
for option in options:
if option.startswith(bufferStatus.being_completed):
self.completer.candidates.append(option)
return