-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_manager.py
121 lines (86 loc) · 4.52 KB
/
db_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# module qui va contenir le code "métier" relatif à la bdd
from db_context_managers import OpenReaderCursor, OpenWriterCursor, OpenMultiWriterCursor
from collections import defaultdict
_SQL_get_api_permissions = """ SELECT permission
FROM api_permission_mapping
WHERE api = :api """
_SQL_is_suspicious_methode_description = """ SELECT 1
FROM suspicious_apis
WHERE api = :api"""
_SQL_insert_apk = """ INSERT INTO apks(name, dataset ,malignity)
VALUES (:name, :dataset,:malignity)"""
_SQL_insert_features = """ INSERT INTO features(apk_id, name, type)
VALUES (:apk_id, :name, :type)"""
_SQL_get_apk_features = """SELECT printf("%s::%s", features.type, features.name) AS feature
FROM apks
JOIN
features ON apks.id = features.apk_id
WHERE apks.id =:apk_id"""
_SQL_get_apk_malignity = """SELECT malignity
FROM apks
WHERE name = :apk_name"""
_SQL_get_apks = """SELECT id, malignity
FROM apks"""
_SQL_get_datasets_apks = """SELECT id, malignity
FROM apks
WHERE dataset IN (%s)"""
_SQL_get_apk_record = """SELECT id, name, malignity, dataset
FROM apks
WHERE id = :apk_id"""
_SQL_get_api_permission_mapping = """SELECT api, permission
FROM api_permission_mapping"""
_SQL_get_suspicious_apis = """SELECT api
FROM suspicious_apis"""
_SQL_is_apk_in_db = """SELECT 1
FROM apks
WHERE name = :name AND dataset = :dataset"""
def get_apk_record(apk_id: int) -> dict:
with OpenReaderCursor(_SQL_get_apk_record, {'apk_id': apk_id}) as cursor:
apk = cursor.fetchone()
return {'id': apk[0],
'name': apk[1],
'malignity': apk[2],
'dataset': apk[3]}
def get_api_permission_mapping() -> dict:
api_permission_mapping_dict = defaultdict(lambda: set())
with OpenReaderCursor(_SQL_get_api_permission_mapping) as cursor:
for api, permission in cursor.fetchall():
api_permission_mapping_dict[api].add(permission)
return api_permission_mapping_dict
def get_suspicious_api() -> dict:
suspicious_api_dict = defaultdict(lambda: False)
with OpenReaderCursor(_SQL_get_suspicious_apis) as cursor:
for api in cursor.fetchall():
suspicious_api_dict[api[0]] = True
return suspicious_api_dict
def get_datasets_apks(datasets: list) -> list:
sql_request = _SQL_get_datasets_apks % ('?,' * len(datasets))[:-1]
with OpenReaderCursor(sql_request, datasets) as cursor:
return [apk for apk in cursor.fetchall()]
def get_apks() -> list:
with OpenReaderCursor(_SQL_get_apks) as cursor:
return [apk for apk in cursor.fetchall()]
def get_apk_features(apk_id: int) -> list:
with OpenReaderCursor(_SQL_get_apk_features, {'apk_id': apk_id}) as cursor:
return [feature[0] for feature in cursor.fetchall()]
def get_apk_malignity(apk_name: str) -> int:
with OpenReaderCursor(_SQL_get_apk_malignity, {'apk_name': apk_name}) as cursor:
return cursor.fetchone()[0]
def get_api_permissions(api: str) -> list:
with OpenReaderCursor(_SQL_get_api_permissions, {'api': api}) as cursor:
return [permission[0] for permission in cursor.fetchall()]
def is_suspicious(method_description: str) -> bool:
with OpenReaderCursor(_SQL_is_suspicious_methode_description, {'api': method_description}) as cursor:
return len(cursor.fetchall()) != 0
def is_apk_in_db(apk_name: str, apk_dataset: str) -> bool:
with OpenReaderCursor(_SQL_is_apk_in_db, {'name': apk_name, 'dataset': apk_dataset}) as cursor:
return len(cursor.fetchall()) != 0
def insert_apk(apk: 'APK') -> int:
with OpenWriterCursor(_SQL_insert_apk, {'name': apk.get_name(),
'malignity': apk.get_malignity(),
'dataset': apk.get_dataset()}) as cursor:
return cursor.lastrowid
def insert_features(features: list):
""" list est une liste de dicotionnaires de la forme {'apk_id': ..., 'name': ..., 'malignity': ...}"""
with OpenMultiWriterCursor(_SQL_insert_features, features):
pass