forked from malicialab/avclass
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavclass_labeler.py
executable file
·428 lines (361 loc) · 14.1 KB
/
avclass_labeler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#!/usr/bin/env python2
'''
AVClass labeler
'''
import sys
sys.path.insert(0, 'lib/')
import argparse
from avclass_common import AvLabels
from operator import itemgetter
import evaluate_clustering as ec
import json
import traceback
import os
# Default alias file
default_alias_file = "data/default.aliases"
# Default generic tokens file
default_gen_file = "data/default.generics"
# Default directory containing VT reports
default_vt_dir = ""
def guess_hash(h):
'''Given a hash string, guess the hash type based on the string length'''
hlen = len(h)
if hlen == 32:
return 'md5'
elif hlen == 40:
return 'sha1'
elif hlen == 64:
return 'sha256'
else:
return None
def main(args):
# Select hash used to identify sample, by default MD5
hash_type = args.hash if args.hash else 'md5'
# If ground truth provided, read it from file
gt_dict = {}
if args.gt:
with open(args.gt, 'r') as gt_fd:
for line in gt_fd:
gt_hash, family = map(str.lower, line.strip().split('\t', 1))
gt_dict[gt_hash] = family
# Guess type of hash in ground truth file
hash_type = guess_hash(gt_dict.keys()[0])
# Setting VT directory if present
if args.vt_dir: default_vt_dir = args.vt_dir
# Create AvLabels object
av_labels = AvLabels(args.gen, args.alias, args.av)
# Select input file with AV labels
if not default_vt_dir: ifile = args.vt if args.vt else args.lb
# If verbose, open log file
if args.verbose:
log_filename = os.path.basename(os.path.splitext(ifile)[0]) + \
'.verbose'
verb_fd = open(log_filename, 'w+')
# Process file or directory
vt_all = 0
vt_empty = 0
singletons = 0
if not default_vt_dir: fd = open(ifile, 'r')
fd = os.listdir(default_vt_dir)
first_token_dict = {}
token_count_map = {}
pair_count_map = {}
token_family_map = {}
fam_stats = {}
# Process each JSON file
for line in fd:
if not default_vt_dir:
# If blank line, skip
if line == '\n':
continue
# Debug info
if vt_all % 100 == 0:
sys.stderr.write('\r[-] %d JSON read' % vt_all)
sys.stderr.flush()
vt_all += 1
# Read JSON line or file and extract sample info (i.e., hashes and labels)
vt_rep = json.loads(line) if not default_vt_dir else json.load(open(os.path.join(default_vt_dir, line)))
sample_info = av_labels.get_sample_info(vt_rep, from_vt = True)
if sample_info is None:
try:
name = vt_rep['md5']
sys.stderr.write('\nNo AV labels for %s\n' % name)
except KeyError:
sys.stderr.write('\nCould not process: %s\n' % line)
sys.stderr.flush()
vt_empty += 1
continue
# Sample's name is selected hash type (md5 by default)
name = getattr(sample_info, hash_type)
# If the VT report has no AV labels, continue
if not sample_info[3]:
vt_empty += 1
sys.stderr.write('\nNo AV labels for %s\n' % name)
sys.stderr.flush()
continue
# Get the distinct tokens from all the av labels in the report
# And print them. If not verbose, print the first token.
# If verbose, print the whole list
try:
# Get distinct tokens from AV labels
tokens = av_labels.get_family_ranking(sample_info).items()
# If alias detection, populate maps
if args.aliasdetect:
prev_tokens = set([])
for entry in tokens:
curr_tok = entry[0]
curr_count = token_count_map.get(curr_tok)
if curr_count:
token_count_map[curr_tok] = curr_count + 1
else:
token_count_map[curr_tok] = 1
for prev_tok in prev_tokens:
if prev_tok < curr_tok:
pair = (prev_tok,curr_tok)
else:
pair = (curr_tok,prev_tok)
pair_count = pair_count_map.get(pair)
if pair_count:
pair_count_map[pair] = pair_count + 1
else:
pair_count_map[pair] = 1
prev_tokens.add(curr_tok)
# If generic token detection, populate map
if args.gendetect and args.gt:
for entry in tokens:
curr_tok = entry[0]
curr_fam_set = token_family_map.get(curr_tok)
family = gt_dict[name] if name in gt_dict else None
if curr_fam_set and family:
curr_fam_set.add(family)
elif family:
token_family_map[curr_tok] = set(family)
# Top candidate is most likely family name
if tokens:
family = tokens[0][0]
is_singleton = False
else:
family = "SINGLETON:" + name
is_singleton = True
singletons += 1
# Check if sample is PUP, if requested
if args.pup:
is_pup = av_labels.is_pup(sample_info[3])
if is_pup:
is_pup_str = "\t1"
else:
is_pup_str = "\t0"
else:
is_pup = None
is_pup_str = ""
# Build family map for precision, recall, computation
first_token_dict[name] = family
# Get ground truth family, if available
if args.gt:
gt_family = '\t' + gt_dict[name] if name in gt_dict else ""
else:
gt_family = ""
# Print family (and ground truth if available) to stdout
print '%s\t%s%s%s' % (name, family, gt_family, is_pup_str)
# If verbose, print tokens (and ground truth if available)
# to log file
if args.verbose:
verb_fd.write('%s\t%s%s%s\n' % (
name, tokens, gt_family, is_pup_str))
# Store family stats (if required)
if args.fam:
if is_singleton:
ff = 'SINGLETONS'
else:
ff = family
try:
numAll, numMal, numPup = fam_stats[ff]
except KeyError:
numAll = 0
numMal = 0
numPup = 0
numAll += 1
if args.pup:
if is_pup:
numPup += 1
else:
numMal += 1
fam_stats[ff] = (numAll, numMal, numPup)
except:
traceback.print_exc(file=sys.stderr)
continue
# Debug info
sys.stderr.write('\r[-] %d JSON read' % vt_all)
sys.stderr.flush()
sys.stderr.write('\n')
# Close VT file
if not default_vt_dir: fd.close()
# Print statistics
sys.stderr.write(
"[-] Samples: %d NoLabels: %d Singletons: %d "
"GroundTruth: %d\n" % (
vt_all, vt_empty, singletons, len(gt_dict)))
# If ground truth, print precision, recall, and F1-measure
if args.gt and args.eval:
precision, recall, fmeasure = \
ec.eval_precision_recall_fmeasure(gt_dict,
first_token_dict)
sys.stderr.write( \
"Precision: %.2f\tRecall: %.2f\tF1-Measure: %.2f\n" % \
(precision, recall, fmeasure))
# If generic token detection, print map
if args.gendetect:
# Open generic tokens file
gen_filename = os.path.basename(os.path.splitext(ifile)[0]) + \
'.gen'
gen_fd = open(gen_filename, 'w+')
# Output header line
gen_fd.write("Token\t#Families\n")
sorted_pairs = sorted(token_family_map.iteritems(),
key=lambda x: len(x[1]) if x[1] else 0,
reverse=True)
for (t,fset) in sorted_pairs:
gen_fd.write("%s\t%d\n" % (t, len(fset)))
# Close generic tokens file
gen_fd.close()
# If alias detection, print map
if args.aliasdetect:
# Open alias file
alias_filename = os.path.basename(os.path.splitext(ifile)[0]) + \
'.alias'
alias_fd = open(alias_filename, 'w+')
# Sort token pairs by number of times they appear together
sorted_pairs = sorted(
pair_count_map.items(), key=itemgetter(1))
# Output header line
alias_fd.write("# t1\tt2\t|t1|\t|t2|\t|t1^t2|\t|t1^t2|/|t1|\n")
# Compute token pair statistic and output to alias file
for (t1,t2),c in sorted_pairs:
n1 = token_count_map[t1]
n2 = token_count_map[t2]
if (n1 < n2):
x = t1
y = t2
xn = n1
yn = n2
else:
x = t2
y = t1
xn = n2
yn = n1
f = float(c) / float(xn)
alias_fd.write("%s\t%s\t%d\t%d\t%d\t%0.2f\n" % (
x,y,xn,yn,c,f))
# Close alias file
alias_fd.close()
# If family statistics, output to file
if args.fam:
# Open family file
fam_filename = os.path.basename(os.path.splitext(ifile)[0]) + \
'.families'
fam_fd = open(fam_filename, 'w+')
# Output header line
if args.pup:
fam_fd.write("# Family\tTotal\tMalware\tPUP\tFamType\n")
else:
fam_fd.write("# Family\tTotal\n")
# Sort map
sorted_pairs = sorted(fam_stats.items(), key=itemgetter(1),
reverse=True)
# Print map contents
for (f,fstat) in sorted_pairs:
if args.pup:
if fstat[1] > fstat[2]:
famType = "malware"
else:
famType = "pup"
fam_fd.write("%s\t%d\t%d\t%d\t%s\n" % (f, fstat[0], fstat[1],
fstat[2], famType))
else:
fam_fd.write("%s\t%d\n" % (f, fstat[0]))
# Close file
fam_fd.close()
# Close log file
if args.verbose:
sys.stderr.write('[-] Verbose output in %s\n' % (log_filename))
verb_fd.close()
if __name__=='__main__':
argparser = argparse.ArgumentParser(prog='avclass_labeler',
description='''Extracts the family of a set of samples.
Also calculates precision and recall if ground truth available''')
argparser.add_argument('-vt',
help='file with full VT reports '
'(REQUIRED if -lb argument not present)')
argparser.add_argument('-lb',
help='file with simplified JSON reports'
'{md5,sha1,sha256,scan_date,av_labels} '
'(REQUIRED if -vt not present)')
argparser.add_argument('-gt',
help='file with ground truth')
argparser.add_argument('-eval',
action='store_true',
help='if used it evaluates clustering accuracy.'
' Prints precision, recall, F1-measure. Requires -gt parameter')
argparser.add_argument('-alias',
help='file with aliases.',
default = default_alias_file)
argparser.add_argument('-gen',
help='file with generic tokens.',
default = default_gen_file)
argparser.add_argument('-av',
help='file with list of AVs to use')
argparser.add_argument('-pup',
action='store_true',
help='if used each sample is classified as PUP or not')
argparser.add_argument('-gendetect',
action='store_true',
help='if used produce generics file at end. Requires -gt parameter')
argparser.add_argument('-aliasdetect',
action='store_true',
help='if used produce aliases file at end')
argparser.add_argument('-v', '--verbose',
action='store_true',
help='output .verbose file with distinct tokens')
argparser.add_argument('-hash',
help='hash used to name samples. Should match ground truth',
choices=['md5', 'sha1', 'sha256'])
argparser.add_argument('-fam',
action='store_true',
help='if used produce families file with PUP/malware counts per family')
argparser.add_argument('-vt_dir',
help='Specify an existing directory containing VT reports')
args = argparser.parse_args()
if not args.vt and not args.lb and not args.vt_dir:
sys.stderr.write('Argument -vt, -lb or -vt_dir is required\n')
exit(1)
if args.vt and args.lb:
sys.stderr.write('Use either -vt or -lb argument, not both.\n')
exit(1)
if (args.vt or args.lb) and args.vt_dir :
sys.stderr.write('Use either -vt/-lb or -vt_dir argument, not both.\n')
exit(1)
if args.gendetect and not args.gt:
sys.stderr.write('Generic token detection requires -gt param\n')
exit(1)
if args.eval and not args.gt:
sys.stderr.write('Evaluating clustering accuracy needs -gt param\n')
exit(1)
if args.alias:
if args.alias == '/dev/null':
sys.stderr.write('[-] Using no aliases\n')
else:
sys.stderr.write('[-] Using aliases in %s\n' % (
args.alias))
else:
sys.stderr.write('[-] Using generic aliases in %s\n' % (
default_alias_file))
if args.gen:
if args.gen == '/dev/null':
sys.stderr.write('[-] Using no generic tokens\n')
else:
sys.stderr.write('[-] Using generic tokens in %s\n' % (
args.gen))
else:
sys.stderr.write('[-] Using default generic tokens in %s\n' % (
default_gen_file))
main(args)