-
Notifications
You must be signed in to change notification settings - Fork 0
/
classify.py
123 lines (99 loc) · 3.94 KB
/
classify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import shutil
import sys
from shutil import copyfile
import re
import tensorflow as tf
import numpy as np
import keras
from keras.src.legacy.preprocessing.text import Tokenizer
from keras.src.utils import pad_sequences
import glob
import time
# Parameters
model_path = 'saved_model/malware_detection_model_20k.keras' # Replace with the path to your saved model
file = 'virus_samples/Virus.Win32.Expiro.virus' # Replace with the path to the file you want to classify
max_length = 40000 # The length to which you truncate or pad the file
ascii_pattern = re.compile(b'[ -~]{5,}')
def extract_strings_from_binary(file_path):
"""
Extract strings from a binary file (Windows or Linux).
:param file_path: Path to the binary file
:return: List of extracted strings
"""
try:
with open(file_path, 'rb') as f:
content = f.read(40000)
# Use regex to find ASCII and Unicode strings
return ''.join([s.decode('ascii') for s in ascii_pattern.findall(content)]).encode('ascii')
except FileNotFoundError:
return b''
def load_executable_strings(file_path):
data = extract_strings_from_binary(file_path)
data = np.frombuffer(data, dtype=np.uint8)
if len(data) > 20000:
data = data[:20000]
else:
data = np.pad(data, (0, 20000 - len(data)), mode='constant')
return data.astype(np.float32)
def preprocess_file(file_path, max_length):
"""Preprocess the binary file to a fixed length."""
# Read the binary file
with open(file_path, 'rb') as file:
data = file.read()
# Convert to numpy array and truncate or pad to max_length
data = np.frombuffer(data, dtype=np.uint8)
if len(data) > max_length:
data = data[:max_length]
else:
data = np.pad(data, (0, max_length - len(data)), mode='constant')
return data
def preprocess_quantized(file_path, max_length):
with open(file_path, 'rb') as file:
data = file.read()
# Convert to numpy array and truncate or pad to max_length
data = np.frombuffer(data, dtype=np.uint8)
if len(data) > max_length:
data = data[:max_length]
else:
data = np.pad(data, (0, max_length - len(data)), mode='constant')
return data.astype(np.float32)
def quantized_classify(file_data, interpreter):
# Get input and output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
#print(input_details)
# Set input tensor
interpreter.set_tensor(input_details[0]['index'], file_data.reshape(1, -1))
# Run inference
interpreter.invoke()
# Get the output
output_data = interpreter.get_tensor(output_details[0]['index'])
return output_data[0][0]
def classify_file(model: keras.Model, file_data):
"""Use the trained model to classify the file."""
# Reshape for the model input (batch size, length)
file_data = file_data.reshape(1, -1) # Add batch dimension
res = model.predict(file_data, verbose=0)[0][0]
return res
# Load the trained model
#model = keras.models.load_model('saved_model/malware_detection_model_20k_strings.keras')
interpreter = tf.lite.Interpreter(model_path='saved_model/malware_detection_quantized_40k.tflite')
interpreter.allocate_tensors()
# string_interp = tf.lite.Interpreter(model_path='saved_model/malware_detection_quantized_20k_strings.tflite')
# string_interp.allocate_tensors()
start_time = time.time()
counter = 0
for file in glob.glob('virus/*', recursive=True):
try:
quantized = quantized_classify(preprocess_quantized(file, 40000), interpreter)
counter += 1
except:
continue
print(f'\r{counter}', end='')
if quantized < 0.95:
print('\r',quantized, file)
# quantized_strings = quantized_classify(load_executable_strings(file), string_interp)
# print(quantized, quantized_strings)
# if quantized > 0.1:
# # Output the result
# shutil.copy(file, 'falsepositives')