-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmalpedia_client.py
222 lines (191 loc) · 8.41 KB
/
malpedia_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import requests
try:
from urllib.parse import urljoin
except ImportError:
# Python2 fallback
from urlparse import urljoin
from requests.auth import HTTPBasicAuth
import os
class Client():
"""
Python interface for functionalities provided by the REST Api of the Malpedia project.
https://malpedia.caad.fkie.fraunhofer.de
"""
def __init__(self, username=None, password=None, apitoken=None):
"""
Initialize a new instance of the MalpediaApiClient.
Optionally, login data for your useraccount on Malpedia can be provided.
This can either be your username/password combination or an apitoken.
Args:
username: Your Malpedia username.
password: Your Malpedia password.
"""
self.__credentials = None
self.__headers = {}
if apitoken:
self.authenticate_by_token(apitoken)
if username and password:
self.authenticate(username, password)
def authenticate_by_token(self, apitoken):
"""
Set apitoken for authentication
Args:
apitoken: A active Apitoken that was generated on Malpedia.
"""
self.__headers.update({'Authorization': 'APIToken {}'.format(apitoken)})
def authenticate(self, username, password):
"""
Set credentials
Args:
username: Your Malpedia username.
password: Your Malpedia password.
"""
self.__credentials = HTTPBasicAuth(username, password)
def list_families(self):
"""
List all family IDs. This is a helper command to enable follow up commands family data.
Access limitation: none
"""
return self.__make_api_call('list/families')
def list_actors(self):
"""
List all actor IDs. This is a helper command to enable follow commands involving actor data.
Access limitation: none
"""
return self.__make_api_call('list/actors')
def list_apiscout(self):
"""
Provide a list of all non-zero ApiVector fingerprints that are currently on Malpedia.
Access limitation: registration
"""
return self.__make_api_call('list/apiscout')
def list_apiscout_csv(self, destination):
"""
Provide a list of all non-zero ApiVector fingerprints that are currently on Malpedia (in CSV format compatible with ApiScout).
Access limitation: registration
"""
res = self.__make_api_call('list/apiscout/csv', raw=True)
with open(destination, "wb") as csvfile:
csvfile.write(res)
def get_yara_aggregated(self, tlp, destination="malpedia.yar"):
"""
Provide all YARA rules with given TLP.
Access limitation: registration
"""
res = self.__make_api_call('get/yara/{}/raw'.format(tlp), raw=True)
with open(destination, "wb") as csvfile:
csvfile.write(res)
def list_samples(self, family_id):
"""
Provide a list of all samples known for a family, including their packed status and version, if available.
Access limitation: registration
"""
return self.__make_api_call('list/samples/{}'.format(family_id))
def list_yara(self):
"""
Provide a list of all YARA rules in malpedia for all families.
Output may vary depending on access level (public = white, registration = green, amber).
Access limitation: none (but result may vary for registered users)
"""
return self.__make_api_call('list/yara')
def get_family(self, family_id):
"""
Provide meta data for a single family, as identified by <family_id>.
Access limitation: none
"""
return self.__make_api_call('get/family/{}'.format(family_id))
def get_families(self):
"""
Provide meta data for all families.
Access limitation: none
"""
return self.__make_api_call('get/families')
def get_sample_raw(self, sha256):
"""
Provide the sample alongside potentially existing unpacked or dumped files.
Access limitation: registration
"""
return self.__make_api_call('get/sample/{}/raw'.format(sha256))
def get_sample_zip(self, sha256):
"""
Provide the sample alongside potentially existing unpacked or dumped files.
Access limitation: registration
"""
return self.__make_api_call('get/sample/{}/zip'.format(sha256))
def get_yara(self, family_id):
"""
Provide the YARA rules for a given <family_id>.
Output may vary depending on access level (public = white, registration = green, amber).
Access limitation: none (but result may vary for registered users)
"""
return self.__make_api_call('get/yara/{}'.format(family_id))
def get_actor(self, actor_id):
"""
Provide the meta information for a given <actor_id>.
Access limitation: none
"""
return self.__make_api_call('get/actor/{}'.format(actor_id))
def get_misp(self):
"""
A current view of Malpedia in the MISP galaxy cluster format.
Access limitation: none
"""
return self.__make_api_call('get/misp')
def get_version(self):
"""
Obtain the current version of Malpedia (commit number and date).
Access limitation: none
"""
return self.__make_api_call('get/version')
def get_yara_after(self, date):
"""
Provide all YARA rules with a version newer than a specific date. Intended for users intending regular automated updates.
Output may vary depending on access level (public = white, registration = green, amber).
Access limitation: none (but result may vary for registered users)
"""
return self.__make_api_call('get/yara/after/{}'.format(date))
def find_family(self, needle):
"""
Provide a list of all family names and associated synonyms where a part (<needle>) of the name is matched.
Access limitation: none
"""
return self.__make_api_call('find/family/{}'.format(needle))
def find_actor(self, needle):
"""
Provide a list of all actor names and associated synonyms where a part (<needle>) of the name is matched.
Output is potentially subject to change and may include the responsible "name-creator" (e.g. "fireeye": "APT 28", "crowdstrike": "Fancy Bear", ...) in the future.
Access limitation: none
"""
return self.__make_api_call('find/actor/{}'.format(needle))
def scan_binary(self, filepath):
"""
Have a binary scanned against all YARA rules currently contained in Malpedia.
The format of <yara_scan_report> is TBD.
Access limitation: registration
Args:
filepath: Path to the file you wish to analyze. raw binary OR zip file (pwd:infected) containing one or more binaries.
"""
with open(filepath, "rb") as input_file:
return self.__make_api_call('scan/binary', files={'input_file': input_file}, method='POST')
def scan_yara(self, filepath, family_id=None):
"""
Have a YARA rule used to scan against all samples (packed, unpacked, dumped) currently contained in Malpedia.
The format of <yara_scan_report> is TBD.
Access limitation: registration
Args:
filepath: Path to the yara-rule.
"""
url = 'scan/yara/{}'.format(family_id) if family_id else 'scan/yara'
with open(filepath, "rb") as input_file:
return self.__make_api_call(url, data=input_file.read(), method='POST')
def __make_api_call(self, path, method='GET', files=None, data=None, raw=False):
apicall_path = "https://malpedia.caad.fkie.fraunhofer.de/api/" + path.lstrip("/")
response = requests.request(method, apicall_path, auth=self.__credentials, headers=self.__headers, files=files, data=data)
if response.status_code == 200:
if raw: return response.content
return response.json()
elif response.status_code == 403:
raise Exception("Not authorized. You need to be authenticated for this API call.")
elif response.status_code == 404:
raise Exception("Not found. Either the resource you requested does not exist or you need to authenticate.")
raise Exception("HTTP Status Code {:d}. Error while making request.".format(response.status_code))