forked from ubernostrum/webcolors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfuzz_helpers.py
156 lines (130 loc) · 5.04 KB
/
fuzz_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# Atheris fuzzing utilities written by Bailey Capuano
import io
import tempfile
import atheris
import contextlib
from typing import List, Set, Dict, Tuple, Any
def _handle_type(fdp: atheris.FuzzedDataProvider, ty_queue: List[type]) -> Any:
"""
Handles the fuzzing of a single type.
:param fdp: FuzzedDataProvider object
:param ty_queue: The current stack of types to be used for fuzzing
:return: The fuzzed element
"""
if not ty_queue:
return None
ty = ty_queue.pop(0)
if ty is bytes:
return fdp.ConsumeBytes(fdp.ConsumeIntInRange(0, 100))
elif ty is bytearray:
return bytearray(fdp.ConsumeBytes(fdp.ConsumeIntInRange(0, 100)))
elif ty is str:
return fdp.ConsumeUnicodeNoSurrogates(fdp.ConsumeIntInRange(0, 100))
elif ty is float:
return fdp.ConsumeRegularFloat()
elif ty is bool:
return fdp.ConsumeBool()
elif ty is int:
return fdp.ConsumeInt(4)
elif ty is dict:
return build_fuzz_dict(fdp, ty_queue)
elif ty is list:
return build_fuzz_list(fdp, ty_queue)
elif ty is set:
return build_fuzz_set(fdp, ty_queue)
elif ty is tuple:
return build_fuzz_tuple(fdp, ty_queue)
else:
return None
def build_fuzz_list(fdp: atheris.FuzzedDataProvider, ty_queue: List[type]) -> List[Any]:
"""
Builds a list with fuzzer-defined elements.
:param fdp: FuzzedDataProvider object
:param ty_queue: The current stack of types to be used for fuzzing
:return: The list
"""
if not ty_queue:
return []
elem_count = fdp.ConsumeIntInRange(1, 5)
gen_list = []
for _ in range(elem_count):
passed_queue = ty_queue.copy()
elem = _handle_type(fdp, passed_queue)
if elem is not None:
gen_list.append(elem)
ty_queue.pop(0) # Pop elem type
return gen_list
def build_fuzz_set(fdp: atheris.FuzzedDataProvider, ty_queue: List[type]) -> Set[Any]:
"""
Builds a set with fuzzer-defined elements.
:param fdp: FuzzedDataProvider object
:param ty_queue: The current stack of types to be used for fuzzing
:return: The set
"""
if not ty_queue:
return set()
ty_queue.insert(0, list)
fuzz_list = _handle_type(fdp, ty_queue)
return set(fuzz_list)
def build_fuzz_tuple(fdp: atheris.FuzzedDataProvider, ty_queue: List[type]) -> Tuple[Any]:
"""
Builds a tuple with fuzzer-defined elements.
:param fdp: FuzzedDataProvider object
:param ty_queue: The current stack of types to be used for fuzzing
:return: The tuple
"""
if not ty_queue:
return tuple()
ty_queue.insert(0, list)
fuzz_list = _handle_type(fdp, ty_queue)
return tuple(fuzz_list)
def build_fuzz_dict(fdp: atheris.FuzzedDataProvider, ty_queue: List[type]) -> Dict[Any, Any]:
"""
Builds a dictionary with fuzzer-defined keys and values.
:param fdp: FuzzedDataProvider object
:param ty_queue: The current stack of types to be used for fuzzing
:return: The dictionary
"""
if not ty_queue:
return {}
ty_queue.insert(0, list) # handle key
key_list = _handle_type(fdp, ty_queue)
ty_queue.insert(0, list) # handle key
val_list = _handle_type(fdp, ty_queue)
# Shrink lists to match
if len(key_list) > len(val_list):
key_list = key_list[:len(val_list)]
elif len(val_list) > len(key_list):
val_list = val_list[:len(key_list)]
return dict(zip(key_list, val_list))
class EnhancedFuzzedDataProvider(atheris.FuzzedDataProvider):
def ConsumeRandomBytes(self) -> bytes:
return self.ConsumeBytes(self.ConsumeIntInRange(0, self.remaining_bytes()))
def ConsumeRandomString(self) -> str:
return self.ConsumeUnicodeNoSurrogates(self.ConsumeIntInRange(0, self.remaining_bytes()))
def ConsumeRemainingString(self) -> str:
return self.ConsumeUnicodeNoSurrogates(self.remaining_bytes())
def ConsumeRemainingBytes(self) -> bytes:
return self.ConsumeBytes(self.remaining_bytes())
@contextlib.contextmanager
def ConsumeMemoryFile(self, all_data: bool = False, as_bytes: bool = True) -> io.BytesIO:
if all_data:
file_data = self.ConsumeRemainingBytes() if as_bytes else self.ConsumeRemainingString()
else:
file_data = self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString()
file = io.BytesIO(file_data) if as_bytes else io.StringIO(file_data)
yield file
file.close()
@contextlib.contextmanager
def ConsumeTemporaryFile(self, suffix: str, all_data: bool = False, as_bytes: bool = True) -> io.StringIO:
if all_data:
file_data = self.ConsumeRemainingBytes() if as_bytes else self.ConsumeRemainingString()
else:
file_data = self.ConsumeRandomBytes() if as_bytes else self.ConsumeRandomString()
mode = 'w+b' if as_bytes else 'w+'
tfile = tempfile.NamedTemporaryFile(mode=mode, suffix=suffix)
tfile.write(file_data)
tfile.seek(0)
tfile.flush()
yield tfile.name
tfile.close()