From 11818f9feb702b5209f3a23da71ea468513c3650 Mon Sep 17 00:00:00 2001 From: Takahiro Yoshimura Date: Sat, 6 Jan 2024 23:13:17 +0900 Subject: [PATCH] Refactoring --- pyproject.toml | 2 +- trueseeing/app/_dummy.py | 31 - trueseeing/app/exploit.py | 21 +- trueseeing/app/inspect.py | 627 ++++++++++---------- trueseeing/app/patch.py | 12 +- trueseeing/app/scan.py | 72 +-- trueseeing/app/shell.py | 3 + trueseeing/core/asm.py | 14 +- trueseeing/core/code/parse.py | 3 +- trueseeing/core/context.py | 43 +- trueseeing/core/literalquery.py | 101 +++- trueseeing/core/patch.py | 40 +- trueseeing/core/report.py | 20 +- trueseeing/core/sign.py | 6 +- trueseeing/core/store.py | 8 +- trueseeing/core/tools.py | 18 +- trueseeing/libs/files.0.sql | 2 + trueseeing/signature/crypto.py | 210 +++---- trueseeing/signature/fingerprint.py | 58 +- trueseeing/signature/privacy.py | 114 ++-- trueseeing/signature/security.py | 852 ++++++++++++++-------------- 21 files changed, 1149 insertions(+), 1108 deletions(-) delete mode 100644 trueseeing/app/_dummy.py create mode 100644 trueseeing/libs/files.0.sql diff --git a/pyproject.toml b/pyproject.toml index d23c410..d40dc2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dev = [ Source = "https://github.com/alterakey/trueseeing" [project.scripts] -trueseeing = "trueseeing.app._dummy:invoke" +trueseeing = "trueseeing.app.shell:entry" [tool.mypy] strict = true diff --git a/trueseeing/app/_dummy.py b/trueseeing/app/_dummy.py deleted file mode 100644 index b5f5c9e..0000000 --- a/trueseeing/app/_dummy.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -# Trueseeing: Non-decompiling Android application vulnerability scanner -# Copyright (C) 2017-23 Takahiro Yoshimura -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -This file must be kept in a python2 and python3 compatible syntax. -""" - -from __future__ import print_function # this is here for the version check to work on Python 2. - -def invoke() -> None: - import sys - if sys.version_info < (3, 7): - print("fatal: requires Python 3.7 or later", file=sys.stderr) - sys.exit(2) - else: - import trueseeing.app.shell - trueseeing.app.shell.Shell().invoke() diff --git a/trueseeing/app/exploit.py b/trueseeing/app/exploit.py index 37349a1..91d1ad5 100644 --- a/trueseeing/app/exploit.py +++ b/trueseeing/app/exploit.py @@ -82,8 +82,7 @@ def apply(self, context: Context) -> None: manifest = context.parsed_manifest() for e in manifest.xpath('.//application'): e.attrib['{http://schemas.android.com/apk/res/android}debuggable'] = "true" - with context.store().db as c: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest))) + context.store().query().patch_put(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest)) class ExploitEnableBackup: _patcher: Patcher @@ -98,8 +97,7 @@ def apply(self, context: Context) -> None: manifest = context.parsed_manifest() for e in manifest.xpath('.//application'): e.attrib['{http://schemas.android.com/apk/res/android}allowBackup'] = "true" - with context.store().db as c: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest))) + context.store().query().patch_put(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest)) class ExploitDisablePinning: _patcher: Patcher @@ -117,8 +115,11 @@ def apply(self, context: Context) -> None: e.attrib['{http://schemas.android.com/apk/res/android}networkSecurityConfig'] = "@xml/network_security_config" with context.store().db as c: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest))) - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='resources/package_1/res/xml/network_security_config.xml', blob=b'''\ + from trueseeing.core.literalquery import Query + query = Query(c=c) + + query.patch_put(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest)) + query.patch_put(path='resources/package_1/res/xml/network_security_config.xml', blob=b'''\ @@ -128,9 +129,9 @@ def apply(self, context: Context) -> None: -''')) - for r, in c.execute('select blob from files where path=:path', dict(path='resources/package_1/res/values/public.xml')): - root = ET.fromstring(r, parser=ET.XMLParser(recover=True)) +''') + root = query.file_get_xml('resources/package_1/res/values/public.xml') + assert root is not None if root.xpath('./public[@type="xml"]'): maxid = max(int(e.attrib["id"], 16) for e in root.xpath('./public[@type="xml"]')) n = ET.SubElement(root, 'public') @@ -143,4 +144,4 @@ def apply(self, context: Context) -> None: n.attrib['id'] = f'0x{maxid+0x10000:08x}' n.attrib['type'] = 'xml' n.attrib['name'] = 'network_security_config' - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='resources/package_1/res/values/public.xml', blob=ET.tostring(root))) + query.patch_put(path='resources/package_1/res/values/public.xml', blob=ET.tostring(root)) diff --git a/trueseeing/app/inspect.py b/trueseeing/app/inspect.py index 04c272a..5570fa8 100644 --- a/trueseeing/app/inspect.py +++ b/trueseeing/app/inspect.py @@ -293,6 +293,15 @@ def _require_target(self, msg: Optional[str] = None) -> None: if self._target is None: ui.fatal(msg if msg else 'need target') + def _get_context(self, path: str) -> Context: + from trueseeing.core.context import Context + return Context(path, []) + + async def _get_context_analyzed(self, path: str) -> Context: + c = self._get_context(path) + await c.analyze() + return c + @contextmanager def _apply_graph_size_limit(self, l: Optional[int]) -> Iterator[None]: from trueseeing.core.flow.data import DataFlows @@ -313,13 +322,12 @@ async def _analyze(self, args: deque[str]) -> None: ui.info(f"analyzing {apk}") - from trueseeing.core.context import Context - with Context(apk, []) as context: - if cmd.endswith('!'): - context.remove() - await context.analyze() - with context.store().db as db: - db.execute('delete from analysis_issues') + context = self._get_context(apk) + if cmd.endswith('!'): + context.remove() + await context.analyze() + with context.store().db as db: + db.execute('delete from analysis_issues') async def _analyze2(self, args: deque[str]) -> None: await self._analyze(args) @@ -339,7 +347,6 @@ async def _scan(self, args: deque[str]) -> None: import time from trueseeing.app.scan import ScanMode - from trueseeing.core.context import Context with self._apply_graph_size_limit(limit): at = time.time() @@ -352,10 +359,8 @@ async def _scan(self, args: deque[str]) -> None: update_cache_mode=False, from_inspect_mode=True, ) - with Context(apk, []) as context: - with context.store().db as db: - for nr, in db.execute('select count(1) from analysis_issues'): - ui.success("done, found {nr} issues ({t:.02f} sec.)".format(nr=nr, t=(time.time() - at))) + nr = self._get_context(apk).store().query().issue_count() + ui.success("done, found {nr} issues ({t:.02f} sec.)".format(nr=nr, t=(time.time() - at))) async def _show_file(self, args: deque[str]) -> None: outfn: Optional[str] = None @@ -377,15 +382,16 @@ async def _show_file(self, args: deque[str]) -> None: ui.fatal('outfile exists; force (!) to overwrite') from binascii import hexlify - from trueseeing.core.context import Context - with Context(self._target, []) as context: - with context.store().db as db: - for d, in db.execute('select blob from files where path like :path', dict(path=path)): - if outfn is None: - sys.stdout.buffer.write(d if 'x' not in cmd else hexlify(d)) - else: - with open(outfn, 'wb') as f: - f.write(d if 'x' not in cmd else hexlify(d)) + + context = self._get_context(self._target) + d = context.store().query().file_get(path) + if d is None: + ui.fatal('file not found') + if outfn is None: + sys.stdout.buffer.write(d if 'x' not in cmd else hexlify(d)) + else: + with open(outfn, 'wb') as f: + f.write(d if 'x' not in cmd else hexlify(d)) async def _show_disasm(self, args: deque[str]) -> None: outfn: Optional[str] = None @@ -407,17 +413,14 @@ async def _show_disasm(self, args: deque[str]) -> None: if os.path.exists(outfn) and not cmd.endswith('!'): ui.fatal('outfile exists; force (!) to overwrite') - from trueseeing.core.context import Context - with Context(self._target, []) as context: - store = context.store() - path = '{}.smali'.format(os.path.join(*(class_.split('.')))) - with store.db as db: - for d, in db.execute('select blob from files where path like :path', dict(path=f'smali%/{path}')): - if outfn is None: - sys.stdout.buffer.write(d) - else: - with open(outfn, 'wb') as f: - f.write(d) + context = self._get_context(self._target) + path = '{}.smali'.format(os.path.join(*(class_.split('.')))) + for _, d in context.store().query().file_enum(f'smali%/{path}'): + if outfn is None: + sys.stdout.buffer.write(d) + else: + with open(outfn, 'wb') as f: + f.write(d) async def _show_solved_constant(self, args: deque[str]) -> None: self._require_target() @@ -434,25 +437,23 @@ async def _show_solved_constant(self, args: deque[str]) -> None: limit = self._get_graph_size_limit(self._get_modifiers(args)) - from trueseeing.core.context import Context from trueseeing.core.flow.data import DataFlows with self._apply_graph_size_limit(limit): - with Context(apk, []) as context: - await context.analyze() - store = context.store() - op = store.op_get(opn) - if op is not None: - if cmd.endswith('!'): - vs = DataFlows.solved_possible_constant_data_in_invocation(store, op, idx) - ui.info(repr(vs)) - else: - try: - v = DataFlows.solved_constant_data_in_invocation(store, op, idx) - ui.info(repr(v)) - except DataFlows.NoSuchValueError as e: - ui.error(str(e)) + context = await self._get_context_analyzed(apk) + store = context.store() + op = store.op_get(opn) + if op is not None: + if cmd.endswith('!'): + vs = DataFlows.solved_possible_constant_data_in_invocation(store, op, idx) + ui.info(repr(vs)) else: - ui.error('op #{} not found'.format(opn)) + try: + v = DataFlows.solved_constant_data_in_invocation(store, op, idx) + ui.info(repr(v)) + except DataFlows.NoSuchValueError as e: + ui.error(str(e)) + else: + ui.error('op #{} not found'.format(opn)) async def _show_solved_typeset(self, args: deque[str]) -> None: self._require_target() @@ -469,18 +470,16 @@ async def _show_solved_typeset(self, args: deque[str]) -> None: limit = self._get_graph_size_limit(self._get_modifiers(args)) - from trueseeing.core.context import Context from trueseeing.core.flow.data import DataFlows with self._apply_graph_size_limit(limit): - with Context(apk, []) as context: - await context.analyze() - store = context.store() - op = store.op_get(opn) - if op is not None: - vs = DataFlows.solved_typeset_in_invocation(store, op, idx) - ui.info(repr(vs)) - else: - ui.error('op #{} not found'.format(opn)) + context = await self._get_context_analyzed(apk) + store = context.store() + op = store.op_get(opn) + if op is not None: + vs = DataFlows.solved_typeset_in_invocation(store, op, idx) + ui.info(repr(vs)) + else: + ui.error('op #{} not found'.format(opn)) async def _report_html(self, args: deque[str]) -> None: outfn: Optional[str] = None @@ -496,18 +495,17 @@ async def _report_html(self, args: deque[str]) -> None: if os.path.exists(outfn) and not cmd.endswith('!'): ui.fatal('outfile exists; force (!) to overwrite') - from trueseeing.core.context import Context from trueseeing.core.report import HTMLReportGenerator - with Context(self._target, []) as context: - gen = HTMLReportGenerator(context) - if outfn is None: - from io import StringIO - f0 = StringIO() - gen.generate(f0) - ui.stdout(f0.getvalue()) - else: - with open(outfn, 'w') as f1: - gen.generate(f1) + context = self._get_context(self._target) + gen = HTMLReportGenerator(context) + if outfn is None: + from io import StringIO + f0 = StringIO() + gen.generate(f0) + ui.stdout(f0.getvalue()) + else: + with open(outfn, 'w') as f1: + gen.generate(f1) async def _report_json(self, args: deque[str]) -> None: outfn: Optional[str] = None @@ -523,18 +521,17 @@ async def _report_json(self, args: deque[str]) -> None: if os.path.exists(outfn) and not cmd.endswith('!'): ui.fatal('outfile exists; force (!) to overwrite') - from trueseeing.core.context import Context from trueseeing.core.report import JSONReportGenerator - with Context(self._target, []) as context: - gen = JSONReportGenerator(context) - if outfn is None: - from io import StringIO - f0 = StringIO() - gen.generate(f0) - ui.stdout(f0.getvalue()) - else: - with open(outfn, 'w') as f1: - gen.generate(f1) + context = self._get_context(self._target) + gen = JSONReportGenerator(context) + if outfn is None: + from io import StringIO + f0 = StringIO() + gen.generate(f0) + ui.stdout(f0.getvalue()) + else: + with open(outfn, 'w') as f1: + gen.generate(f1) async def _report_text(self, args: deque[str]) -> None: outfn: Optional[str] = None @@ -550,18 +547,17 @@ async def _report_text(self, args: deque[str]) -> None: if os.path.exists(outfn) and not cmd.endswith('!'): ui.fatal('outfile exists; force (!) to overwrite') - from trueseeing.core.context import Context from trueseeing.core.report import CIReportGenerator - with Context(self._target, []) as context: - gen = CIReportGenerator(context) - if outfn is None: - from io import StringIO - f0 = StringIO() - gen.generate(f0) - ui.stdout(f0.getvalue()) - else: - with open(outfn, 'w') as f1: - gen.generate(f1) + context = self._get_context(self._target) + gen = CIReportGenerator(context) + if outfn is None: + from io import StringIO + f0 = StringIO() + gen.generate(f0) + ui.stdout(f0.getvalue()) + else: + with open(outfn, 'w') as f1: + gen.generate(f1) async def _search_file(self, args: deque[str]) -> None: self._require_target() @@ -575,12 +571,9 @@ async def _search_file(self, args: deque[str]) -> None: else: pat = '.' - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for path, in store.db.execute('select path from files where path regexp :pat', dict(pat=pat)): - ui.info(f'{path}') + context = await self._get_context_analyzed(apk) + for path, in context.store().db.execute('select path from files where path regexp :pat', dict(pat=pat)): + ui.info(f'{path}') async def _search_string(self, args: deque[str]) -> None: self._require_target() @@ -596,12 +589,9 @@ async def _search_string(self, args: deque[str]) -> None: ui.info('searching in files: {pat}'.format(pat=pat)) - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for path, in store.db.execute('select path from files where blob regexp :pat', dict(pat=pat.encode('latin1'))): - ui.info(f'{path}') + context = await self._get_context_analyzed(apk) + for path, in context.store().db.execute('select path from files where blob regexp :pat', dict(pat=pat.encode('latin1'))): + ui.info(f'{path}') async def _search_call(self, args: deque[str]) -> None: self._require_target() @@ -615,14 +605,12 @@ async def _search_call(self, args: deque[str]) -> None: pat = args.popleft() - from trueseeing.core.context import Context from trueseeing.core.code.model import InvocationPattern - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().invocations(InvocationPattern('invoke-', pat)): - qn = store.query().qualname_of(op) - ui.info(f'{qn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.invocations(InvocationPattern('invoke-', pat)): + qn = q.qualname_of(op) + ui.info(f'{qn}: {op}') async def _search_const(self, args: deque[str]) -> None: self._require_target() @@ -640,14 +628,12 @@ async def _search_const(self, args: deque[str]) -> None: else: pat = '.' - from trueseeing.core.context import Context from trueseeing.core.code.model import InvocationPattern - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().consts(InvocationPattern(insn, pat)): - qn = store.query().qualname_of(op) - ui.info(f'{qn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.consts(InvocationPattern(insn, pat)): + qn = q.qualname_of(op) + ui.info(f'{qn}: {op}') async def _search_put(self, args: deque[str]) -> None: self._require_target() @@ -661,18 +647,16 @@ async def _search_put(self, args: deque[str]) -> None: else: pat = '.' - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - if not cmd.endswith('i'): - fun = store.query().sputs - else: - fun = store.query().iputs + context = await self._get_context_analyzed(apk) + q = context.store().query() + if not cmd.endswith('i'): + fun = q.sputs + else: + fun = q.iputs - for op in fun(pat): - qn = store.query().qualname_of(op) - ui.info(f'{qn}: {op}') + for op in fun(pat): + qn = q.qualname_of(op) + ui.info(f'{qn}: {op}') async def _search_defined_package(self, args: deque[str]) -> None: self._require_target() @@ -687,16 +671,14 @@ async def _search_defined_package(self, args: deque[str]) -> None: pat = '.' import os - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - packages = set() - for fn in (context.source_name_of_disassembled_class(r) for r in context.disassembled_classes()): - if fn.endswith('.smali'): - packages.add(os.path.dirname(fn)) - for pkg in sorted(packages): - if re.match(pat, pkg): - ui.info(pkg) + context = await self._get_context_analyzed(apk) + packages = set() + for fn in (context.source_name_of_disassembled_class(r) for r in context.disassembled_classes()): + if fn.endswith('.smali'): + packages.add(os.path.dirname(fn)) + for pkg in sorted(packages): + if re.match(pat, pkg): + ui.info(pkg) async def _search_defined_class(self, args: deque[str]) -> None: self._require_target() @@ -710,13 +692,11 @@ async def _search_defined_class(self, args: deque[str]) -> None: pat = args.popleft() - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().classes_in_package_named(pat): - cn = store.query().class_name_of(op) - ui.info(f'{cn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.classes_in_package_named(pat): + cn = q.class_name_of(op) + ui.info(f'{cn}: {op}') async def _search_derived_class(self, args: deque[str]) -> None: self._require_target() @@ -734,13 +714,11 @@ async def _search_derived_class(self, args: deque[str]) -> None: else: pat = '.' - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().classes_extends_has_method_named(base, pat): - cn = store.query().class_name_of(op) - ui.info(f'{cn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.classes_extends_has_method_named(base, pat): + cn = q.class_name_of(op) + ui.info(f'{cn}: {op}') async def _search_implementing_class(self, args: deque[str]) -> None: self._require_target() @@ -758,13 +736,11 @@ async def _search_implementing_class(self, args: deque[str]) -> None: else: pat = '.' - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().classes_implements_has_method_named(interface, pat): - cn = store.query().class_name_of(op) - ui.info(f'{cn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.classes_implements_has_method_named(interface, pat): + cn = q.class_name_of(op) + ui.info(f'{cn}: {op}') async def _search_defined_method(self, args: deque[str]) -> None: self._require_target() @@ -778,13 +754,11 @@ async def _search_defined_method(self, args: deque[str]) -> None: pat = args.popleft() - from trueseeing.core.context import Context - with Context(apk, []) as context: - await context.analyze() - store = context.store() - for op in store.query().classes_has_method_named(pat): - qn = store.query().qualname_of(op) - ui.info(f'{qn}: {op}') + context = await self._get_context_analyzed(apk) + q = context.store().query() + for op in q.classes_has_method_named(pat): + qn = q.qualname_of(op) + ui.info(f'{qn}: {op}') async def _export_context(self, args: deque[str]) -> None: self._require_target() @@ -800,20 +774,19 @@ async def _export_context(self, args: deque[str]) -> None: import os import time - from trueseeing.core.context import Context at = time.time() extracted = 0 - with Context(self._target, []) as context: - with context.store().db as c: - for path,blob in c.execute('select path,blob from files'): - target = os.path.join(root, *path.split('/')) - if extracted % 10000 == 0: - ui.info(' .. {nr} files'.format(nr=extracted)) - os.makedirs(os.path.dirname(target), exist_ok=True) - with open(target, 'wb') as f: - f.write(blob) - extracted += 1 + context = self._get_context(self._target) + q = context.store().query() + for path,blob in q.file_enum(pat=None): + target = os.path.join(root, *path.split('/')) + if extracted % 10000 == 0: + ui.info(' .. {nr} files'.format(nr=extracted)) + os.makedirs(os.path.dirname(target), exist_ok=True) + with open(target, 'wb') as f: + f.write(blob) + extracted += 1 ui.success('done: {nr} files ({t:.02f} sec.)'.format(nr=extracted, t=(time.time() - at))) async def _use_framework(self, args: deque[str]) -> None: @@ -836,20 +809,18 @@ async def _use_framework(self, args: deque[str]) -> None: async def _assemble_apk_from_path(self, wd: str, path: str) -> Tuple[str, str]: import os - from importlib.resources import as_file, files from trueseeing.core.sign import SigningKey - from trueseeing.core.tools import invoke_passthru + from trueseeing.core.tools import invoke_passthru, toolchains - with as_file(files('trueseeing.libs').joinpath('apkeditor.jar')) as apkeditorpath: - with as_file(files('trueseeing.libs').joinpath('apksigner.jar')) as apksignerpath: - await invoke_passthru( - '(java -jar {apkeditor} b -i {path} -o {wd}/output.apk && java -jar {apksigner} sign --ks {keystore} --ks-pass pass:android {wd}/output.apk)'.format( - wd=wd, path=path, - apkeditor=apkeditorpath, - apksigner=apksignerpath, - keystore=await SigningKey().key(), - ) + with toolchains() as tc: + await invoke_passthru( + '(java -jar {apkeditor} b -i {path} -o {wd}/output.apk && java -jar {apksigner} sign --ks {keystore} --ks-pass pass:android {wd}/output.apk)'.format( + wd=wd, path=path, + apkeditor=tc['apkeditor'], + apksigner=tc['apksigner'], + keystore=await SigningKey().key(), ) + ) return os.path.join(wd, 'output.apk'), os.path.join(wd, 'output.apk.idsig') @@ -918,8 +889,7 @@ async def _disassemble(self, args: deque[str]) -> None: import time import shutil from tempfile import TemporaryDirectory - from trueseeing.core.tools import invoke_passthru - from importlib.resources import as_file, files + from trueseeing.core.tools import invoke_passthru, toolchains path = args.popleft() apk = self._target @@ -932,12 +902,12 @@ async def _disassemble(self, args: deque[str]) -> None: at = time.time() with TemporaryDirectory() as td: - with as_file(files(__name__).joinpath('libs').joinpath('apkeditor.jar')) as apkeditorpath: + with toolchains() as tc: await invoke_passthru( '(java -jar {apkeditor} d -o {td}/f -i {apk} {s})'.format( td=td, apk=apk, s='-dex' if 's' in cmd else '', - apkeditor=apkeditorpath, + apkeditor=tc['apkeditor'], ) ) @@ -957,18 +927,17 @@ async def _exploit_discard(self, args: deque[str]) -> None: import os.path import shutil import time - from trueseeing.core.context import Context at = time.time() - with Context(self._target, []) as context: - path = os.path.join(context.wd, 'p') - if not os.path.exists(path): - ui.fatal('nothing to discard') + context = self._get_context(self._target) + path = os.path.join(context.wd, 'p') + if not os.path.exists(path): + ui.fatal('nothing to discard') - ui.info('discarding patches to {apk}'.format(apk=apk)) - shutil.rmtree(path) - ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) + ui.info('discarding patches to {apk}'.format(apk=apk)) + shutil.rmtree(path) + ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) async def _exploit_apply(self, args: deque[str]) -> None: self._require_target() @@ -979,7 +948,6 @@ async def _exploit_apply(self, args: deque[str]) -> None: import os import time from tempfile import TemporaryDirectory - from trueseeing.core.context import Context apk = self._target origapk = apk.replace('.apk', '.apk.orig') @@ -989,37 +957,36 @@ async def _exploit_apply(self, args: deque[str]) -> None: at = time.time() - with Context(self._target, []) as context: - path = os.path.join(context.wd, 'p') - if not os.path.exists(path): - ui.fatal('nothing to apply') + context = self._get_context(self._target) + path = os.path.join(context.wd, 'p') + if not os.path.exists(path): + ui.fatal('nothing to apply') - with TemporaryDirectory() as td: - ui.info('applying patches to {apk}'.format(apk=apk)) - outapk, outsig = await self._assemble_apk_from_path(td, path) + with TemporaryDirectory() as td: + ui.info('applying patches to {apk}'.format(apk=apk)) + outapk, outsig = await self._assemble_apk_from_path(td, path) - if os.path.exists(apk): - self._move_apk(apk, origapk) + if os.path.exists(apk): + self._move_apk(apk, origapk) - self._move_apk(outapk, apk) + self._move_apk(outapk, apk) ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) async def _prep_exploit(self, ctx: Context) -> None: import os.path - from importlib.resources import as_file, files - from trueseeing.core.tools import invoke_passthru + from trueseeing.core.tools import invoke_passthru, toolchains ctx.create(exist_ok=True) apk = os.path.join(ctx.wd, 'target.apk') path = os.path.join(ctx.wd, 'p') if not os.path.exists(path): - with as_file(files('trueseeing.libs').joinpath('apkeditor.jar')) as apkeditorpath: + with toolchains() as tc: await invoke_passthru( '(java -jar {apkeditor} d -o {path} -i {apk} -dex)'.format( apk=apk, path=path, - apkeditor=apkeditorpath, + apkeditor=tc['apkeditor'], ) ) @@ -1034,49 +1001,48 @@ async def _exploit_disable_pinning(self, args: deque[str]) -> None: import shutil import random from importlib.resources import as_file, files - from trueseeing.core.context import Context ui.info('disabling declarative TLS pinning {apk}'.format(apk=self._target)) at = time.time() + context = self._get_context(self._target) - with Context(self._target, []) as context: - await self._prep_exploit(context) - path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') - key = 'nsc{:04x}'.format(random.randint(0, 2**16)) + await self._prep_exploit(context) + path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') + key = 'nsc{:04x}'.format(random.randint(0, 2**16)) - manif = self._parsed_manifest(path) - for e in manif.xpath('.//application'): - e.attrib['{http://schemas.android.com/apk/res/android}usesCleartextTraffic'] = "true" - e.attrib['{http://schemas.android.com/apk/res/android}networkSecurityConfig'] = f'@xml/{key}' - with open(path, 'wb') as f: - f.write(self._manifest_as_xml(manif)) - - # XXX - path = os.path.join(context.wd, 'p', 'resources', 'package_1', 'res', 'xml', f'{key}.xml') - with as_file(files('trueseeing.libs').joinpath('nsc.xml')) as nscpath: - shutil.copy(nscpath, path) - - # XXX - import lxml.etree as ET - path = os.path.join(context.wd, 'p', 'resources', 'package_1', 'res', 'values', 'public.xml') - with open(path, 'rb') as f: - root = ET.fromstring(f.read(), parser=ET.XMLParser(recover=True)) - if root.xpath('./public[@type="xml"]'): - maxid = max(int(e.attrib["id"], 16) for e in root.xpath('./public[@type="xml"]')) - n = ET.SubElement(root, 'public') - n.attrib['id'] = f'0x{maxid+1:08x}' - n.attrib['type'] = 'xml' - n.attrib['name'] = key - else: - maxid = (max(int(e.attrib["id"], 16) for e in root.xpath('./public')) & 0xffff0000) - n = ET.SubElement(root, 'public') - n.attrib['id'] = f'0x{maxid+0x10000:08x}' - n.attrib['type'] = 'xml' - n.attrib['name'] = key + manif = self._parsed_manifest(path) + for e in manif.xpath('.//application'): + e.attrib['{http://schemas.android.com/apk/res/android}usesCleartextTraffic'] = "true" + e.attrib['{http://schemas.android.com/apk/res/android}networkSecurityConfig'] = f'@xml/{key}' + with open(path, 'wb') as f: + f.write(self._manifest_as_xml(manif)) - with open(path, 'wb') as f1: - f1.write(ET.tostring(root)) + # XXX + path = os.path.join(context.wd, 'p', 'resources', 'package_1', 'res', 'xml', f'{key}.xml') + with as_file(files('trueseeing.libs').joinpath('nsc.xml')) as nscpath: + shutil.copy(nscpath, path) + + # XXX + import lxml.etree as ET + path = os.path.join(context.wd, 'p', 'resources', 'package_1', 'res', 'values', 'public.xml') + with open(path, 'rb') as f: + root = ET.fromstring(f.read(), parser=ET.XMLParser(recover=True)) + if root.xpath('./public[@type="xml"]'): + maxid = max(int(e.attrib["id"], 16) for e in root.xpath('./public[@type="xml"]')) + n = ET.SubElement(root, 'public') + n.attrib['id'] = f'0x{maxid+1:08x}' + n.attrib['type'] = 'xml' + n.attrib['name'] = key + else: + maxid = (max(int(e.attrib["id"], 16) for e in root.xpath('./public')) & 0xffff0000) + n = ET.SubElement(root, 'public') + n.attrib['id'] = f'0x{maxid+0x10000:08x}' + n.attrib['type'] = 'xml' + n.attrib['name'] = key + + with open(path, 'wb') as f1: + f1.write(ET.tostring(root)) ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) @@ -1088,20 +1054,19 @@ async def _exploit_enable_debug(self, args: deque[str]) -> None: import os.path import time - from trueseeing.core.context import Context ui.info('enabling debug {apk}'.format(apk=self._target)) at = time.time() + context = self._get_context(self._target) + await self._prep_exploit(context) - with Context(self._target, []) as context: - await self._prep_exploit(context) - path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') - manif = self._parsed_manifest(path) - for e in manif.xpath('.//application'): - e.attrib['{http://schemas.android.com/apk/res/android}debuggable'] = "true" - with open(path, 'wb') as f: - f.write(self._manifest_as_xml(manif)) + path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') + manif = self._parsed_manifest(path) + for e in manif.xpath('.//application'): + e.attrib['{http://schemas.android.com/apk/res/android}debuggable'] = "true" + with open(path, 'wb') as f: + f.write(self._manifest_as_xml(manif)) ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) @@ -1113,22 +1078,21 @@ async def _exploit_enable_backup(self, args: deque[str]) -> None: import os.path import time - from trueseeing.core.context import Context ui.info('enabling full backup {apk}'.format(apk=self._target)) at = time.time() - - with Context(self._target, []) as context: - await self._prep_exploit(context) - path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') - manif = self._parsed_manifest(path) - for e in manif.xpath('.//application'): - e.attrib['{http://schemas.android.com/apk/res/android}allowBackup'] = "true" - if '{http://schemas.android.com/apk/res/android}fullBackupContent' in e.attrib: - del e.attrib['{http://schemas.android.com/apk/res/android}fullBackupContent'] - with open(path, 'wb') as f: - f.write(self._manifest_as_xml(manif)) + context = self._get_context(self._target) + await self._prep_exploit(context) + + path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') + manif = self._parsed_manifest(path) + for e in manif.xpath('.//application'): + e.attrib['{http://schemas.android.com/apk/res/android}allowBackup'] = "true" + if '{http://schemas.android.com/apk/res/android}fullBackupContent' in e.attrib: + del e.attrib['{http://schemas.android.com/apk/res/android}fullBackupContent'] + with open(path, 'wb') as f: + f.write(self._manifest_as_xml(manif)) ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) @@ -1145,27 +1109,27 @@ async def _exploit_patch_target_api_level(self, args: deque[str]) -> None: import os.path import time - from trueseeing.core.context import Context ui.info('retargetting API level {level} {apk}'.format(level=level, apk=self._target)) at = time.time() - with Context(self._target, []) as context: - await self._prep_exploit(context) - path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') - manif = self._parsed_manifest(path) - for e in manif.xpath('.//uses-sdk'): - e.attrib['{http://schemas.android.com/apk/res/android}targetSdkVersion'] = str(level) - minLevel = int(e.attrib.get('{http://schemas.android.com/apk/res/android}minSdkVersion', '1')) - if level < minLevel: - if not cmd.endswith('!'): - ui.fatal('cannot target API level below requirement ({minlv}); force (!) to downgrade altogether'.format(minlv=minLevel)) - else: - ui.warn('downgrading the requirement') - e.attrib['{http://schemas.android.com/apk/res/android}minSdkVersion'] = str(level) - with open(path, 'wb') as f: - f.write(self._manifest_as_xml(manif)) + context = self._get_context(self._target) + await self._prep_exploit(context) + + path = os.path.join(context.wd, 'p', 'AndroidManifest.xml') + manif = self._parsed_manifest(path) + for e in manif.xpath('.//uses-sdk'): + e.attrib['{http://schemas.android.com/apk/res/android}targetSdkVersion'] = str(level) + minLevel = int(e.attrib.get('{http://schemas.android.com/apk/res/android}minSdkVersion', '1')) + if level < minLevel: + if not cmd.endswith('!'): + ui.fatal('cannot target API level below requirement ({minlv}); force (!) to downgrade altogether'.format(minlv=minLevel)) + else: + ui.warn('downgrading the requirement') + e.attrib['{http://schemas.android.com/apk/res/android}minSdkVersion'] = str(level) + with open(path, 'wb') as f: + f.write(self._manifest_as_xml(manif)) ui.success('done ({t:.02f} sec.)'.format(t=(time.time() - at))) @@ -1253,55 +1217,54 @@ async def _info(self, args: deque[str]) -> None: apk = self._target import os - from trueseeing.core.context import Context boolmap = {True:'yes',False:'no','true':'yes','false':'no',1:'yes',0:'no'} ui.info(f'info on {apk}') - with Context(self._target, []) as context: - analyzed = os.path.exists(os.path.join(context.wd, '.done')) - - ui.info('path {}'.format(apk)) - ui.info('size {}'.format(os.stat(apk).st_size)) - ui.info('fp {}'.format(context.fingerprint_of())) - ui.info('ctx {}'.format(context.wd)) - ui.info('has patch? {}'.format(boolmap[os.path.exists(os.path.join(context.wd, 'p', 'AndroidManifest.xml'))])) - ui.info('analyzed? {}'.format(boolmap[analyzed])) - if analyzed: - store = context.store() - manif = context.parsed_manifest() - ui.info('pkg {}'.format(manif.attrib['package'])) - ui.info('perms {}'.format(len(list(context.permissions_declared())))) - ui.info('activs {}'.format(len(list(manif.xpath('.//activity'))))) - ui.info('servs {}'.format(len(list(manif.xpath('.//service'))))) - ui.info('recvs {}'.format(len(list(manif.xpath('.//receiver'))))) - ui.info('provs {}'.format(len(list(manif.xpath('.//provider'))))) - ui.info('int-flts {}'.format(len(list(manif.xpath('.//intent-filter'))))) - with store.db as c: - for nr, in c.execute('select count(1) from classes_extends_name where extends_name regexp :pat', dict(pat='^Landroid.*Fragment(Compat)?;$')): - ui.info('frags {}'.format(len(list(manif.xpath('.//activity'))))) - for e in manif.xpath('.//application'): - ui.info('debuggable? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}debuggable', 'false')])) - ui.info('backupable? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}allowBackup', 'false')])) - ui.info('netsecconf? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}networkSecurityConfig') is not None])) - if manif.xpath('.//uses-sdk'): - for e in manif.xpath('.//uses-sdk'): - ui.info('api min {}'.format(int(e.attrib.get('{http://schemas.android.com/apk/res/android}minSdkVersion', '1')))) - ui.info('api tgt {}'.format(int(e.attrib.get('{http://schemas.android.com/apk/res/android}targetSdkVersion', '1')))) - else: - dom = context._parsed_apktool_yml() - ui.info('api min {} (apktool)'.format(int(dom['sdkInfo'].get('minSdkVersion', '1')))) - ui.info('api tgt {} (apktool)'.format(int(dom['sdkInfo'].get('targetSdkVersion', '1')))) - with store.db as c: - for nr, in c.execute('select count(1) from analysis_issues'): - ui.info('issues {}{}'.format(nr, ('' if nr else ' (not scanned yet?)'))) - for nr, in c.execute('select count(1) from ops where idx=0'): - ui.info('ops {}'.format(nr)) - for nr, in c.execute('select count(1) from class_class_name'): - ui.info('classes {}'.format(nr)) - for nr, in c.execute('select count(1) from method_method_name'): - ui.info('methods {}'.format(nr)) + context = self._get_context(self._target) + analyzed = os.path.exists(os.path.join(context.wd, '.done')) + + ui.info('path {}'.format(apk)) + ui.info('size {}'.format(os.stat(apk).st_size)) + ui.info('fp {}'.format(context.fingerprint_of())) + ui.info('ctx {}'.format(context.wd)) + ui.info('has patch? {}'.format(boolmap[os.path.exists(os.path.join(context.wd, 'p', 'AndroidManifest.xml'))])) + ui.info('analyzed? {}'.format(boolmap[analyzed])) + if analyzed: + store = context.store() + manif = context.parsed_manifest() + ui.info('pkg {}'.format(manif.attrib['package'])) + ui.info('perms {}'.format(len(list(context.permissions_declared())))) + ui.info('activs {}'.format(len(list(manif.xpath('.//activity'))))) + ui.info('servs {}'.format(len(list(manif.xpath('.//service'))))) + ui.info('recvs {}'.format(len(list(manif.xpath('.//receiver'))))) + ui.info('provs {}'.format(len(list(manif.xpath('.//provider'))))) + ui.info('int-flts {}'.format(len(list(manif.xpath('.//intent-filter'))))) + with store.db as c: + for nr, in c.execute('select count(1) from classes_extends_name where extends_name regexp :pat', dict(pat='^Landroid.*Fragment(Compat)?;$')): + ui.info('frags {}'.format(len(list(manif.xpath('.//activity'))))) + for e in manif.xpath('.//application'): + ui.info('debuggable? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}debuggable', 'false')])) + ui.info('backupable? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}allowBackup', 'false')])) + ui.info('netsecconf? {}'.format(boolmap[e.attrib.get('{http://schemas.android.com/apk/res/android}networkSecurityConfig') is not None])) + if manif.xpath('.//uses-sdk'): + for e in manif.xpath('.//uses-sdk'): + ui.info('api min {}'.format(int(e.attrib.get('{http://schemas.android.com/apk/res/android}minSdkVersion', '1')))) + ui.info('api tgt {}'.format(int(e.attrib.get('{http://schemas.android.com/apk/res/android}targetSdkVersion', '1')))) + else: + dom = context._parsed_apktool_yml() + ui.info('api min {} (apktool)'.format(int(dom['sdkInfo'].get('minSdkVersion', '1')))) + ui.info('api tgt {} (apktool)'.format(int(dom['sdkInfo'].get('targetSdkVersion', '1')))) + with store.db as c: + for nr, in c.execute('select count(1) from analysis_issues'): + ui.info('issues {}{}'.format(nr, ('' if nr else ' (not scanned yet?)'))) + for nr, in c.execute('select count(1) from ops where idx=0'): + ui.info('ops {}'.format(nr)) + for nr, in c.execute('select count(1) from class_class_name'): + ui.info('classes {}'.format(nr)) + for nr, in c.execute('select count(1) from method_method_name'): + ui.info('methods {}'.format(nr)) async def _set_target(self, args: deque[str]) -> None: _ = args.popleft() diff --git a/trueseeing/app/patch.py b/trueseeing/app/patch.py index 8073041..94227fc 100644 --- a/trueseeing/app/patch.py +++ b/trueseeing/app/patch.py @@ -50,23 +50,23 @@ def apply(self, context: Context) -> None: manifest = context.parsed_manifest(patched=True) for e in manifest.xpath('.//application'): e.attrib['{http://schemas.android.com/apk/res/android}debuggable'] = "false" - with context.store().db as c: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest))) + context.store().query().patch_put(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest)) class PatchBackupable: def apply(self, context: Context) -> None: manifest = context.parsed_manifest(patched=True) for e in manifest.xpath('.//application'): e.attrib['{http://schemas.android.com/apk/res/android}allowBackup'] = "false" - with context.store().db as c: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest))) + context.store().query().patch_put(path='AndroidManifest.xml', blob=context.manifest_as_xml(manifest)) class PatchLoggers: def apply(self, context: Context) -> None: import re with context.store().db as c: - for fn, content in c.execute('select path, coalesce(B.blob, A.blob) as blob from files as A left join patches as B using (path) where path like :path', dict(path='smali/%.smali')): + from trueseeing.core.literalquery import Query + query = Query(c=c) + for fn, content in query.file_enum('smali/%.smali'): stage0 = re.sub(rb'^.*?invoke-static.*?Landroid/util/Log;->.*?\(.*?$', b'', content, flags=re.MULTILINE) stage1 = re.sub(rb'^.*?invoke-virtual.*?Ljava/io/Print(Writer|Stream);->.*?\(.*?$', b'', stage0, flags=re.MULTILINE) if content != stage1: - c.execute('replace into patches (path, blob) values (:path,:blob)', dict(path=fn, blob=stage1)) + query.patch_put(path=fn, blob=stage1) diff --git a/trueseeing/app/scan.py b/trueseeing/app/scan.py index 731d33b..3261132 100644 --- a/trueseeing/app/scan.py +++ b/trueseeing/app/scan.py @@ -53,10 +53,9 @@ async def invoke(self, ci_mode: ReportFormat, outfile: Optional[str], signatures if await session.invoke(f): error_found = True if not from_inspect_mode: - with Context(f, []) as context: - with context.store().db as db: - for nr, in db.execute('select count(1) from analysis_issues'): - ui.success('{fn}: analysis done, {nr} issues ({t:.02f} sec.)'.format(fn=f, nr=nr, t=(time.time() - at))) + context = Context(f, []) + nr = context.store().query().issue_count() + ui.success('{fn}: analysis done, {nr} issues ({t:.02f} sec.)'.format(fn=f, nr=nr, t=(time.time() - at))) finally: if no_cache_mode: Context(f, []).remove() @@ -78,41 +77,42 @@ def __init__(self, chain: List[Type[Detector]], outfile: Optional[str], ci_mode: self._exclude_packages = exclude_packages async def invoke(self, apkfilename: str) -> bool: - with Context(apkfilename, self._exclude_packages) as context: - await context.analyze() - ui.info(f"{apkfilename} -> {context.wd}") - with context.store().db as db: - db.execute('delete from analysis_issues') + context = Context(apkfilename, self._exclude_packages) + await context.analyze() + ui.info(f"{apkfilename} -> {context.wd}") - found = False + from trueseeing.core.literalquery import Query + query = Query(c=context.store().db) + query.issue_clear() - reporter: ReportGenerator - if self._outfile is None: - reporter = CIReportGenerator(context) + found = False + + reporter: ReportGenerator + if self._outfile is None: + reporter = CIReportGenerator(context) + else: + if self._ci_mode == 'json': + reporter = JSONReportGenerator(context) else: - if self._ci_mode == 'json': - reporter = JSONReportGenerator(context) - else: - reporter = HTMLReportGenerator(context) - - with context.store().db as db: - # XXX - def _detected(issue: Issue) -> None: - global found - found = True # type: ignore[name-defined] - reporter.note(issue) - db.execute( - 'insert into analysis_issues (detector, summary, synopsis, description, seealso, solution, info1, info2, info3, confidence, cvss3_score, cvss3_vector, source, row, col) values (:detector_id, :summary, :synopsis, :description, :seealso, :solution, :info1, :info2, :info3, :confidence, :cvss3_score, :cvss3_vector, :source, :row, :col)', - issue.__dict__) - pub.subscribe(_detected, 'issue') - await asyncio.gather(*[c(context).detect() for c in self._chain]) - pub.unsubscribe(_detected, 'issue') - - if self._outfile is not None: - with self._open_outfile() as f: - reporter.generate(f) - - return reporter.return_(found) + reporter = HTMLReportGenerator(context) + + with context.store().db: + # XXX + def _detected(issue: Issue) -> None: + global found + found = True # type: ignore[name-defined] + reporter.note(issue) + query.issue_raise(issue) + + pub.subscribe(_detected, 'issue') + await asyncio.gather(*[c(context).detect() for c in self._chain]) + pub.unsubscribe(_detected, 'issue') + + if self._outfile is not None: + with self._open_outfile() as f: + reporter.generate(f) + + return reporter.return_(found) def _open_outfile(self) -> TextIO: assert self._outfile is not None diff --git a/trueseeing/app/shell.py b/trueseeing/app/shell.py index ec0dae1..8765951 100644 --- a/trueseeing/app/shell.py +++ b/trueseeing/app/shell.py @@ -293,3 +293,6 @@ def invoke(self) -> int: no_cache_mode=no_cache_mode, update_cache_mode=update_cache_mode, )) + +def entry() -> None: + Shell().invoke() diff --git a/trueseeing/core/asm.py b/trueseeing/core/asm.py index 428e9b4..a9ce789 100644 --- a/trueseeing/core/asm.py +++ b/trueseeing/core/asm.py @@ -45,8 +45,8 @@ def _do(self) -> None: import glob import subprocess import shutil - from importlib.resources import as_file, files - from trueseeing.core.literalquery import StorePrep + from trueseeing.core.literalquery import StorePrep, FileTablePrep, Query + from trueseeing.core.tools import toolchains apk, archive = 'target.apk', 'store.db' @@ -54,15 +54,15 @@ def _do(self) -> None: try: os.chdir(self._context.wd) c = sqlite3.connect(archive) + query = Query(c=c) with c: StorePrep(c).stage0() - c.execute('drop table if exists files') - c.execute('create table files(path text not null unique, blob bytes not null)') + FileTablePrep(c).prepare() with c: - with as_file(files('trueseeing.libs').joinpath('apkeditor.jar')) as path: + with toolchains() as tc: _ = subprocess.run('java -jar {apkeditor} d -i {apk} -o files'.format( - apkeditor=path, + apkeditor=tc['apkeditor'], apk=apk ), shell=True, capture_output=True) os.chdir('files') @@ -71,7 +71,7 @@ def read_as_row(fn: str) -> Tuple[str, bytes]: with open(fn, 'rb') as f: return fn, f.read() - c.executemany('insert into files (path, blob) values (?,?)', (read_as_row(fn) for fn in glob.glob('**', recursive=True) if os.path.isfile(fn))) + query.file_put_batch(read_as_row(fn) for fn in glob.glob('**', recursive=True) if os.path.isfile(fn)) c.commit() finally: os.chdir(cwd) diff --git a/trueseeing/core/code/parse.py b/trueseeing/core/code/parse.py index 3be9da3..a0f6da1 100644 --- a/trueseeing/core/code/parse.py +++ b/trueseeing/core/code/parse.py @@ -43,6 +43,7 @@ def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[ def analyze(self) -> None: import time + from trueseeing.core.literalquery import Query analyzed_ops = 0 analyzed_methods = 0 analyzed_classes = 0 @@ -56,7 +57,7 @@ def analyze(self) -> None: base_id = 1 last_seen = analyzed_ops - for f, in c.execute('select blob from files where path like :path', dict(path='smali/%.smali')): + for _, f in Query(c=c).file_enum('smali/%.smali'): ops = [] for op in P.parsed_flat(f.decode('utf-8')): analyzed_ops += 1 diff --git a/trueseeing/core/context.py b/trueseeing/core/context.py index 09d0395..cc6c82d 100644 --- a/trueseeing/core/context.py +++ b/trueseeing/core/context.py @@ -95,11 +95,7 @@ def _copy_target(self) -> None: shutil.copyfile(self._apk, os.path.join(self.wd, 'target.apk')) def parsed_manifest(self, patched: bool = False) -> Any: - stmt0 = 'select blob from files where path=:path' - stmt1 = 'select coalesce(B.blob, A.blob) as blob from files as A left join patches as B using (path) where path=:path' - with self.store().db as db: - for o, in db.execute(stmt1 if patched else stmt0, dict(path='AndroidManifest.xml')): - return ET.fromstring(o, parser=ET.XMLParser(recover=True)) + return self.store().query().file_get_xml('AndroidManifest.xml', patched=patched) def manifest_as_xml(self, manifest: Any) -> bytes: assert manifest is not None @@ -108,9 +104,9 @@ def manifest_as_xml(self, manifest: Any) -> bytes: def _parsed_apktool_yml(self) -> Any: # FIXME: using ruamel.yaml? import yaml - with self.store().db as db: - for o, in db.execute('select blob from files where path=:path', dict(path='apktool.yml')): - return yaml.safe_load(re.sub(r'!!brut\.androlib\..*', '', o.decode('utf-8'))) + o = self.store().query().file_get('apktool.yml') + if o is not None: + return yaml.safe_load(re.sub(r'!!brut\.androlib\..*', '', o.decode('utf-8'))) def get_target_sdk_version(self) -> int: manif = self.parsed_manifest() @@ -131,18 +127,15 @@ def get_min_sdk_version(self) -> int: @functools.lru_cache(maxsize=1) def disassembled_classes(self) -> List[str]: - with self.store().db as db: - return [f for f, in db.execute('select path from files where path like :path', dict(path='smali%.smali'))] + return list(self.store().query().file_find('smali%.smali')) @functools.lru_cache(maxsize=1) def disassembled_resources(self) -> List[str]: - with self.store().db as db: - return [f for f, in db.execute('select path from files where path like :path', dict(path='%/res/%.xml'))] + return list(self.store().query().file_find('%/res/%.xml')) @functools.lru_cache(maxsize=1) def disassembled_assets(self) -> List[str]: - with self.store().db as db: - return [f for f, in db.execute('select path from files where path like :path', dict(path='root/%/assets/%'))] + return list(self.store().query().file_find('root/%/assets/%')) def source_name_of_disassembled_class(self, fn: str) -> str: return os.path.join(*fn.split('/')[2:]) @@ -161,32 +154,22 @@ def permissions_declared(self) -> Iterable[Any]: @functools.lru_cache(maxsize=1) def _string_resource_files(self) -> List[str]: - with self.store().db as db: - return [f for f, in db.execute('select path from files where path like :path', dict(path='%/res/values/%strings%'))] + return list(self.store().query().file_find('%/res/values/%strings%')) def string_resources(self) -> Iterable[Tuple[str, str]]: - with self.store().db as db: - for o, in db.execute('select blob from files where path like :path', dict(path='%/res/values/%strings%')): - yield from ((c.attrib['name'], c.text) for c in ET.fromstring(o, parser=ET.XMLParser(recover=True)).xpath('//resources/string') if c.text) + for _, o in self.store().query().file_enum('%/res/values/%strings%'): + yield from ((c.attrib['name'], c.text) for c in ET.fromstring(o, parser=ET.XMLParser(recover=True)).xpath('//resources/string') if c.text) @functools.lru_cache(maxsize=1) def _xml_resource_files(self) -> List[str]: - with self.store().db as db: - return [f for f, in db.execute('select path from files where path like :path', dict(path='%/res/xml/%.xml'))] + return list(self.store().query().file_find('%/res/xml/%.xml')) def xml_resources(self) -> Iterable[Tuple[str, Any]]: - with self.store().db as db: - for fn, o in db.execute('select path, blob from files where path like :path', dict(path='%/res/xml/%.xml')): - yield (fn, ET.fromstring(o, parser=ET.XMLParser(recover=True))) + for fn, o in self.store().query().file_enum('%/res/xml/%.xml'): + yield (fn, ET.fromstring(o, parser=ET.XMLParser(recover=True))) def is_qualname_excluded(self, qualname: Optional[str]) -> bool: if qualname is not None: return any([re.match(f'L{x}', qualname) for x in self.excludes]) else: return False - - def __enter__(self) -> Context: - return self - - def __exit__(self, *exc_details: Any) -> None: - pass diff --git a/trueseeing/core/literalquery.py b/trueseeing/core/literalquery.py index 642a483..68d1457 100644 --- a/trueseeing/core/literalquery.py +++ b/trueseeing/core/literalquery.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING from trueseeing.core.code.model import Op +from trueseeing.core.issue import Issue if TYPE_CHECKING: from typing import Any, Iterable, Tuple, Dict, Optional @@ -47,9 +48,24 @@ def stage2(self) -> None: with open(path, 'r', encoding='utf-8') as f: self.c.executescript(f.read()) +class FileTablePrep: + def __init__(self, c: Any) -> None: + self.c = c + + def prepare(self) -> None: + from importlib.resources import as_file, files + with as_file(files('trueseeing.libs').joinpath('files.0.sql')) as path: + with open(path, 'r', encoding='utf-8') as f: + self.c.executescript(f.read()) + class Query: - def __init__(self, store: Store) -> None: - self.db = store.db + def __init__(self, *, store: Optional[Store] = None, c: Any = None) -> None: + if c is not None: + self.db = c + elif store is not None: + self.db = store.db + else: + raise RuntimeError('store or c is required') @staticmethod def _op_from_row(r: Tuple[Any, ...]) -> Op: @@ -169,3 +185,84 @@ def class_of_method(self, method: Op) -> Optional[Op]: for r in self.db.execute('select op as _0, t as _1, v as _2, op1 as _3, t1 as _4, v1 as _5, op2 as _6, t2 as _7, v2 as _8, op3 as _9, t3 as _10, v3 as _11, op4 as _12, t4 as _13, v4 as _14, op5 as _15, t5 as _16, v5 as _17, op6 as _18, t6 as _19, v6 as _20, op7 as _21, t7 as _22, v7 as _23, op8 as _24, t8 as _25, v8 as _26, op9 as _27, t9 as _28, v9 as _29 from op_vecs where op=(select class from ops_class where op=:from_op)', dict(from_op=method._id)): return self._op_from_row(r) return None + + def methods_with_modifier(self, pattern: str) -> Iterable[Op]: + for r in self.db.execute('select op_vecs.op as _0, t as _1, v as _2, op1 as _3, t1 as _4, v1 as _5, op2 as _6, t2 as _7, v2 as _8, op3 as _9, t3 as _10, v3 as _11, op4 as _12, t4 as _13, v4 as _14, op5 as _15, t5 as _16, v5 as _17, op6 as _18, t6 as _19, v6 as _20, op7 as _21, t7 as _22, v7 as _23, op8 as _24, t8 as _25, v8 as _26, op9 as _27, t9 as _28, v9 as _29 from ops_method join op_vecs on (method=ops_method.op and method=op_vecs.op) where v=:pat or v2=:pat or v3=:pat or v4=:pat or v5=:pat or v6=:pat or v7=:pat or v8=:pat or v9=:pat', dict(pat=pattern)): + yield Query._op_from_row(r) + + def file_find(self, pat: str) -> Iterable[str]: + for f, in self.db.execute('select path from files where path like :pat', dict(pat=pat)): + yield f + + def file_get(self, path: str, default: Optional[bytes] = None, patched: bool = False) -> Optional[bytes]: + stmt0 = 'select blob from files where path=:path' + stmt1 = 'select coalesce(B.blob, A.blob) as blob from files as A left join patches as B using (path) where path=:path' + for b, in self.db.execute(stmt1 if patched else stmt0, dict(path=path)): + return b # type:ignore[no-any-return] + else: + return default + + def file_get_xml(self, path: str, default: Any = None, patched: bool = False) -> Any: + import lxml.etree as ET + r = self.file_get(path, patched=patched) + if r is not None: + return ET.fromstring(r, parser=ET.XMLParser(recover=True)) + else: + return default + + def file_enum(self, pat: Optional[str], patched: bool = False) -> Iterable[Tuple[str, bytes]]: + if pat is not None: + stmt0 = 'select path, blob from files where path like :pat' + stmt1 = 'select path, coalesce(B.blob, A.blob) as blob from files as A left join patches as B using (path) where path like :pat' + for n, o in self.db.execute(stmt1 if patched else stmt0, dict(pat=pat)): + yield n, o + else: + stmt2 = 'select path, blob from files' + for n, o in self.db.execute(stmt2): + yield n, o + + def file_put_batch(self, gen: Iterable[Tuple[str, bytes]]) -> None: + self.db.executemany('insert into files (path, blob) values (?,?)', gen) + + def patch_enum(self, pat: Optional[str]) -> Iterable[Tuple[str, bytes]]: + if pat is not None: + stmt0 = 'select path, blob from patches where path like :pat' + for n, o in self.db.execute(stmt0, dict(pat=pat)): + yield n, o + else: + stmt1 = 'select path, blob from patches' + for n, o in self.db.execute(stmt1): + yield n, o + + def patch_put(self, path: str, blob: bytes) -> None: + self.db.execute('replace into patches (path, blob) values (:path,:blob)', dict(path=path, blob=blob)) + + def patch_clear(self) -> None: + self.db.execute('delete from patches') + + def issue_count(self) -> int: + for nr, in self.db.execute('select count(1) from analysis_issues'): + return int(nr) + else: + return 0 + + def issue_raise(self, issue: Issue) -> None: + self.db.execute( + 'insert into analysis_issues (detector, summary, synopsis, description, seealso, solution, info1, info2, info3, confidence, cvss3_score, cvss3_vector, source, row, col) values (:detector_id, :summary, :synopsis, :description, :seealso, :solution, :info1, :info2, :info3, :confidence, :cvss3_score, :cvss3_vector, :source, :row, :col)', + issue.__dict__) + + def issue_clear(self) -> None: + self.db.execute('delete from analysis_issues') + + def issues(self) -> Iterable[Issue]: + from trueseeing.core.issue import Issue + for m in self.db.execute('select * from analysis_issues'): + yield Issue.from_analysis_issues_row(m) + + def findings_list(self) -> Iterable[Tuple[int, Tuple[str, str, str, str, str, str, float, str]]]: + for no, row in enumerate(self.db.execute('select distinct detector, summary, synopsis, description, seealso, solution, cvss3_score, cvss3_vector from analysis_issues order by cvss3_score desc')): + yield no, row + + def issues_by_group(self, *, detector: str, summary: str, cvss3_score: float) -> Iterable[Issue]: + for m in self.db.execute('select * from analysis_issues where detector=:detector and summary=:summary and cvss3_score=:cvss3_score', dict(detector=detector, summary=summary, cvss3_score=cvss3_score)): + yield Issue.from_analysis_issues_row(m) diff --git a/trueseeing/core/patch.py b/trueseeing/core/patch.py index e0040bf..f99d192 100644 --- a/trueseeing/core/patch.py +++ b/trueseeing/core/patch.py @@ -40,52 +40,52 @@ async def apply(self, patch: Patch) -> None: return await self.apply_multi([patch]) async def apply_multi(self, patches: List[Patch]) -> None: - with Context(self._path, []) as context: - await context.analyze() - ui.info(f"{self._path} -> {context.wd}") - for p in patches: - p.apply(context) + context = Context(self._path, []) + await context.analyze() + ui.info(f"{self._path} -> {context.wd}") + for p in patches: + p.apply(context) - await self._build(context) + await self._build(context) async def _build(self, context: Context) -> None: from tempfile import TemporaryDirectory + from trueseeing.core.literalquery import Query # XXX insecure with TemporaryDirectory() as d: with context.store().db as c: + query = Query(c=c) cwd = os.getcwd() try: os.chdir(d) os.makedirs('files') os.chdir('files') - for path, blob in c.execute('select path, blob from files'): + for path, blob in query.file_enum(pat=None): dirname = os.path.dirname(path) if dirname: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as f: f.write(blob) - for path, blob in c.execute('select path, blob from patches'): + for path, blob in query.patch_enum(pat=None): dirname = os.path.dirname(path) if dirname: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as f: f.write(blob) - c.execute('delete from patches') + query.patch_clear() c.commit() finally: os.chdir(cwd) - from trueseeing.core.tools import invoke_passthru - from importlib.resources import as_file, files - with as_file(files('trueseeing.libs').joinpath('apkeditor.jar')) as apkeditorpath: - with as_file(files('trueseeing.libs').joinpath('apksigner.jar')) as apksignerpath: - await invoke_passthru('(cd {root} && java -jar {apkeditor} b -i files -o patched.apk && java -jar {apksigner} sign --ks {keystore} --ks-pass pass:android patched.apk && cp -a patched.apk {outpath})'.format( - root=d, - apkeditor=apkeditorpath, - apksigner=apksignerpath, - keystore=await SigningKey().key(), - outpath=self._outpath, - )) + from trueseeing.core.tools import invoke_passthru, toolchains + with toolchains() as tc: + await invoke_passthru('(cd {root} && java -jar {apkeditor} b -i files -o patched.apk && java -jar {apksigner} sign --ks {keystore} --ks-pass pass:android patched.apk && cp -a patched.apk {outpath})'.format( + root=d, + apkeditor=tc['apkeditor'], + apksigner=tc['apksigner'], + keystore=await SigningKey().key(), + outpath=self._outpath, + )) copyfile(os.path.join(d, 'patched.apk'), self._outpath) copyfile(os.path.join(d, 'patched.apk.idsig'), self._outpath + '.idsig') diff --git a/trueseeing/core/report.py b/trueseeing/core/report.py index 453dbc2..f62190b 100644 --- a/trueseeing/core/report.py +++ b/trueseeing/core/report.py @@ -64,10 +64,8 @@ def return_(self, found: bool) -> bool: return found def generate(self, f: TextIO) -> None: - with self._context.store().db as db: - for m in db.execute('select * from analysis_issues'): - issue = Issue.from_analysis_issues_row(m) - f.write(ConsoleNoter.formatted(issue) + '\n') + for issue in self._context.store().query().issues(): + f.write(ConsoleNoter.formatted(issue) + '\n') class HTMLReportGenerator: def __init__(self, context: Context) -> None: @@ -83,12 +81,13 @@ def return_(self, found: bool) -> bool: def generate(self, f: TextIO) -> None: with self._context.store().db as db: + from trueseeing.core.literalquery import Query + query = Query(c=db) issues = [] - for no, row in enumerate(db.execute('select distinct detector, summary, synopsis, description, seealso, solution, cvss3_score, cvss3_vector from analysis_issues order by cvss3_score desc')): + for no, row in query.findings_list(): instances: List[Dict[str, Any]] = [] issues.append(dict(no=no, detector=row[0], summary=row[1].title(), synopsis=row[2], description=row[3], seealso=row[4], solution=row[5], cvss3_score=row[6], cvss3_vector=row[7], severity=CVSS3Scoring.severity_of(row[6]).title(), instances=instances, severity_panel_style={'critical':'panel-danger', 'high':'panel-warning', 'medium':'panel-warning', 'low':'panel-success', 'info':'panel-info'}[CVSS3Scoring.severity_of(row[6])])) - for m in db.execute('select * from analysis_issues where detector=:detector and summary=:summary and cvss3_score=:cvss3_score', {v:row[k] for k,v in {0:'detector', 1:'summary', 6:'cvss3_score'}.items()}): - issue = Issue.from_analysis_issues_row(m) + for issue in query.issues_by_group(detector=row[0], summary=row[1], cvss3_score=row[6]): instances.append(dict(info=issue.brief_info(), source=issue.source, row=issue.row, col=issue.col)) app = dict( @@ -120,8 +119,10 @@ def return_(self, found: bool) -> bool: def generate(self, f: TextIO) -> None: from json import dumps with self._context.store().db as db: + from trueseeing.core.literalquery import Query + query = Query(c=db) issues = [] - for no, row in enumerate(db.execute('select distinct detector, summary, synopsis, description, seealso, solution, cvss3_score, cvss3_vector from analysis_issues order by cvss3_score desc')): + for no, row in query.findings_list(): instances: List[Dict[str, Any]] = [] issues.append(dict( no=no, @@ -136,8 +137,7 @@ def generate(self, f: TextIO) -> None: severity=CVSS3Scoring.severity_of(row[6]).title(), instances=instances )) - for m in db.execute('select * from analysis_issues where detector=:detector and summary=:summary and cvss3_score=:cvss3_score', {v:row[k] for k,v in {0:'detector', 1:'summary', 6:'cvss3_score'}.items()}): - issue = Issue.from_analysis_issues_row(m) + for issue in query.issues_by_group(detector=row[0], summary=row[1], cvss3_score=row[6]): instances.append(dict( info=issue.brief_info(), source=issue.source, diff --git a/trueseeing/core/sign.py b/trueseeing/core/sign.py index a27119a..258ea31 100644 --- a/trueseeing/core/sign.py +++ b/trueseeing/core/sign.py @@ -68,13 +68,13 @@ def __init__(self, path: str, outpath: str): self._outpath = os.path.realpath(outpath) async def resign(self) -> None: - from importlib.resources import as_file, files + from trueseeing.core.tools import toolchains with tempfile.TemporaryDirectory() as d: - with as_file(files('trueseeing.libs').joinpath('apksigner.jar')) as apksignerpath: + with toolchains() as tc: await invoke_passthru( "java -jar {apksigner} sign --ks {ks} --ks-pass pass:android --in {path} --out {d}/signed.apk".format( d=d, - apksigner=apksignerpath, + apksigner=tc['apkeditor'], ks=await SigningKey().key(), path=self._path, ) diff --git a/trueseeing/core/store.py b/trueseeing/core/store.py index 48cc685..98419b1 100644 --- a/trueseeing/core/store.py +++ b/trueseeing/core/store.py @@ -56,12 +56,6 @@ def _re_fn(expr: AnyStr, item: Any) -> bool: else: return False - def __enter__(self) -> Store: - return self - - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - pass - def op_finalize(self) -> None: StorePrep(self.db).stage2() @@ -100,4 +94,4 @@ def op_generate_methodmap(self, c: Any = None) -> int: return detected_methods def query(self) -> Query: - return Query(self) + return Query(store=self) diff --git a/trueseeing/core/tools.py b/trueseeing/core/tools.py index aac5a12..05db285 100644 --- a/trueseeing/core/tools.py +++ b/trueseeing/core/tools.py @@ -20,12 +20,18 @@ import asyncio import functools +from contextlib import contextmanager from trueseeing.core.ui import ui if TYPE_CHECKING: - from typing import Any, Optional, AsyncIterable, TypeVar, List + from pathlib import Path + from typing import Any, Optional, AsyncIterable, TypeVar, List, Iterator, TypedDict T = TypeVar('T') + class Toolchain(TypedDict): + apkeditor: Path + apksigner: Path + def noneif(x: Any, defaulter: Any) -> Any: if x is not None: return x @@ -83,3 +89,13 @@ async def try_invoke(as_: str) -> Optional[str]: return await invoke(as_) except CalledProcessError: return None + +@contextmanager +def toolchains() -> Iterator[Toolchain]: + from importlib.resources import files, as_file + with as_file(files('trueseeing.libs').joinpath('apkeditor.jar')) as apkeditorpath: + with as_file(files('trueseeing.libs').joinpath('apksigner.jar')) as apksignerpath: + yield dict( + apkeditor=apkeditorpath, + apksigner=apksignerpath + ) diff --git a/trueseeing/libs/files.0.sql b/trueseeing/libs/files.0.sql new file mode 100644 index 0000000..26733a4 --- /dev/null +++ b/trueseeing/libs/files.0.sql @@ -0,0 +1,2 @@ +drop table if exists files; +create table files(path text not null unique, blob bytes not null); diff --git a/trueseeing/signature/crypto.py b/trueseeing/signature/crypto.py index c6788ea..07449fe 100644 --- a/trueseeing/signature/crypto.py +++ b/trueseeing/signature/crypto.py @@ -75,53 +75,54 @@ def looks_like_real_key(k: str) -> bool: # XXX: silly return len(k) >= 8 and not any(x in k for x in ('Padding', 'SHA1', 'PBKDF2', 'Hmac', 'emulator')) - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-', '^Ljavax?.*/(SecretKey|(Iv|GCM)Parameter|(PKCS8|X509)EncodedKey)Spec|^Ljavax?.*/MessageDigest;->(update|digest)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - for nr in self._important_args_on_invocation(cl): - for found in DataFlows.solved_possible_constant_data_in_invocation(store, cl, nr): - try: - decoded = base64.b64decode(found) - info1 = '"{target_val}" [{target_val_len}] (base64; "{decoded_val}" [{decoded_val_len}])'.format(target_val=found, target_val_len=len(found), decoded_val=binascii.hexlify(decoded).decode('ascii'), decoded_val_len=len(decoded)) - except (ValueError, binascii.Error): - info1 = f'"{found}" [{len(found)}]' - - if looks_like_real_key(found): - self._raise_issue(Issue( - detector_id=self.option, - cvss3_vector=self._cvss, - confidence='firm', - summary='insecure cryptography: static keys', - info1=info1, - info2=store.query().method_call_target_of(cl), - source=qn, - synopsis='Traces of cryptographic material has been found the application binary.', - description='''\ + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-', '^Ljavax?.*/(SecretKey|(Iv|GCM)Parameter|(PKCS8|X509)EncodedKey)Spec|^Ljavax?.*/MessageDigest;->(update|digest)')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + for nr in self._important_args_on_invocation(cl): + for found in DataFlows.solved_possible_constant_data_in_invocation(store, cl, nr): + try: + decoded = base64.b64decode(found) + info1 = '"{target_val}" [{target_val_len}] (base64; "{decoded_val}" [{decoded_val_len}])'.format(target_val=found, target_val_len=len(found), decoded_val=binascii.hexlify(decoded).decode('ascii'), decoded_val_len=len(decoded)) + except (ValueError, binascii.Error): + info1 = f'"{found}" [{len(found)}]' + + if looks_like_real_key(found): + self._raise_issue(Issue( + detector_id=self.option, + cvss3_vector=self._cvss, + confidence='firm', + summary='insecure cryptography: static keys', + info1=info1, + info2=q.method_call_target_of(cl), + source=qn, + synopsis='Traces of cryptographic material has been found the application binary.', + description='''\ Traces of cryptographic material has been found in the application binary. If cryptographic material is hardcoded, attackers can extract or replace them. ''', - solution='''\ + solution='''\ Use a device or installation specific information, or obfuscate them. ''' - )) - else: - self._raise_issue(Issue( - detector_id=self.option, - cvss3_vector=self._cvss_nonkey, - confidence='tentative', - summary='Cryptographic constants detected', - info1=info1, - info2=store.query().method_call_target_of(cl), - source=qn, - synopsis='Possible cryptographic constants have been found.', - description='''\ + )) + else: + self._raise_issue(Issue( + detector_id=self.option, + cvss3_vector=self._cvss_nonkey, + confidence='tentative', + summary='Cryptographic constants detected', + info1=info1, + info2=q.method_call_target_of(cl), + source=qn, + synopsis='Possible cryptographic constants have been found.', + description='''\ Possible cryptographic constants has been found in the application binary. ''' - )) - except IndexError: - pass + )) + except IndexError: + pass async def _do_detect_case2(self) -> None: # XXX: Crude detection @@ -133,19 +134,37 @@ def should_be_secret(store: Store, k: Op, val: str) -> bool: return False pat = '^MI[IG][0-9A-Za-z+/=-]{32,}AQAB' - with self._context.store() as store: - for cl in store.query().consts(InvocationPattern('const-string', pat)): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - val = cl.p[1].v + store = self._context.store() + q = store.query() + for cl in q.consts(InvocationPattern('const-string', pat)): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + val = cl.p[1].v + self._raise_issue(Issue( + detector_id=self.option, + cvss3_vector=self._cvss, + confidence={True:'firm', False:'tentative'}[should_be_secret(store, cl, val)], # type: ignore[arg-type] + summary='insecure cryptography: static keys (2)', + info1=f'"{val}" [{len(val)}] (X.509)', + source=qn, + synopsis='Traces of X.509 certificates has been found the application binary.', + description='''\ +Traces of X.509 certificates has been found in the application binary. X.509 certificates describe public key materials. Their notable uses include Google Play in-app billing identity. If is hardcoded, attackers can extract or replace them. +''', + solution='''\ +Use a device or installation specific information, or obfuscate them. Especially, do not use the stock implementation of in-app billing logic. +''' + )) + for name, val in self._context.string_resources(): + if re.match(pat, val): self._raise_issue(Issue( detector_id=self.option, cvss3_vector=self._cvss, - confidence={True:'firm', False:'tentative'}[should_be_secret(store, cl, val)], # type: ignore[arg-type] + confidence='tentative', summary='insecure cryptography: static keys (2)', info1=f'"{val}" [{len(val)}] (X.509)', - source=qn, + source=f'R.string.{name}', synopsis='Traces of X.509 certificates has been found the application binary.', description='''\ Traces of X.509 certificates has been found in the application binary. X.509 certificates describe public key materials. Their notable uses include Google Play in-app billing identity. If is hardcoded, attackers can extract or replace them. @@ -154,23 +173,6 @@ def should_be_secret(store: Store, k: Op, val: str) -> bool: Use a device or installation specific information, or obfuscate them. Especially, do not use the stock implementation of in-app billing logic. ''' )) - for name, val in self._context.string_resources(): - if re.match(pat, val): - self._raise_issue(Issue( - detector_id=self.option, - cvss3_vector=self._cvss, - confidence='tentative', - summary='insecure cryptography: static keys (2)', - info1=f'"{val}" [{len(val)}] (X.509)', - source=f'R.string.{name}', - synopsis='Traces of X.509 certificates has been found the application binary.', - description='''\ -Traces of X.509 certificates has been found in the application binary. X.509 certificates describe public key materials. Their notable uses include Google Play in-app billing identity. If is hardcoded, attackers can extract or replace them. -''', - solution='''\ -Use a device or installation specific information, or obfuscate them. Especially, do not use the stock implementation of in-app billing logic. -''' - )) class CryptoEcbDetector(Detector): @@ -179,31 +181,32 @@ class CryptoEcbDetector(Detector): _cvss = 'CVSS:3.0/AV:P/AC:H/PR:N/UI:N/S:U/C:L/I:L/A:L/' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-static', r'Ljavax/crypto/Cipher;->getInstance\(Ljava/lang/String;.*?\)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_possible_constant_data_in_invocation(store, cl, 0) - if any((('ECB' in x or '/' not in x) and 'RSA' not in x) for x in target_val): - self._raise_issue(Issue( - detector_id=self.option, - cvss3_vector=self._cvss, - confidence='certain', - summary='insecure cryptography: cipher might be operating in ECB mode', - info1=','.join(target_val), - source=qn, - synopsis='The application might be using ciphers in ECB mode.', - description='''\ - The application might be using symmetric ciphers in ECB mode. ECB mode is the most basic operating mode that independently transform data blocks. Indepent transformation leaks information about distribution in plaintext. + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-static', r'Ljavax/crypto/Cipher;->getInstance\(Ljava/lang/String;.*?\)')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_possible_constant_data_in_invocation(store, cl, 0) + if any((('ECB' in x or '/' not in x) and 'RSA' not in x) for x in target_val): + self._raise_issue(Issue( + detector_id=self.option, + cvss3_vector=self._cvss, + confidence='certain', + summary='insecure cryptography: cipher might be operating in ECB mode', + info1=','.join(target_val), + source=qn, + synopsis='The application might be using ciphers in ECB mode.', + description='''\ + The application might be using symmetric ciphers in ECB mode. ECB mode is the most basic operating mode that independently transform data blocks. Indepent transformation leaks information about distribution in plaintext. ''', - solution='''\ + solution='''\ Use CBC or CTR mode. ''' - )) - except (DataFlows.NoSuchValueError): - pass + )) + except (DataFlows.NoSuchValueError): + pass class CryptoNonRandomXorDetector(Detector): option = 'crypto-xor' @@ -211,18 +214,19 @@ class CryptoNonRandomXorDetector(Detector): _cvss = 'CVSS:3.0/AV:P/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:L/' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().ops_of('xor-int/lit8'): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - target_val = int(cl.p[2].v, 16) - if (cl.p[0].v == cl.p[1].v) and target_val > 1: - self._raise_issue(Issue( - detector_id=self.option, - cvss3_vector=self._cvss, - confidence='firm', - summary='insecure cryptography: non-random XOR cipher', - info1=f'0x{target_val:02x}', - source=store.query().qualname_of(cl) - )) + store = self._context.store() + q = store.query() + for cl in q.ops_of('xor-int/lit8'): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + target_val = int(cl.p[2].v, 16) + if (cl.p[0].v == cl.p[1].v) and target_val > 1: + self._raise_issue(Issue( + detector_id=self.option, + cvss3_vector=self._cvss, + confidence='firm', + summary='insecure cryptography: non-random XOR cipher', + info1=f'0x{target_val:02x}', + source=q.qualname_of(cl) + )) diff --git a/trueseeing/signature/fingerprint.py b/trueseeing/signature/fingerprint.py index 1d76be0..de9b4ea 100644 --- a/trueseeing/signature/fingerprint.py +++ b/trueseeing/signature/fingerprint.py @@ -25,11 +25,9 @@ from trueseeing.core.code.model import InvocationPattern from trueseeing.signature.base import Detector from trueseeing.core.issue import Issue -from trueseeing.core.literalquery import Query if TYPE_CHECKING: from typing import Iterable, Optional, List, Dict, Any, Set - from trueseeing.core.code.model import Op # XXX huge duplication class TopLevelSuffixes: @@ -115,6 +113,7 @@ def _is_kind_of(cls, c1: str, c2: str) -> bool: return not cls._suffixes_public.looks_public(shared) async def detect(self) -> None: + q = self._context.store().query() package = self._context.parsed_manifest().xpath('/manifest/@package', namespaces=dict(android='http://schemas.android.com/apk/res/android'))[0] packages: Dict[str, List[str]] = dict() @@ -140,7 +139,7 @@ async def detect(self) -> None: )) for p in reversed(sorted(packages.keys(), key=len)): - for k in self._context.store().query().consts_in_package(p, InvocationPattern('const-string', r'[0-9]+\.[0-9]+|(19|20)[0-9]{2}[ /-]')): + for k in q.consts_in_package(p, InvocationPattern('const-string', r'[0-9]+\.[0-9]+|(19|20)[0-9]{2}[ /-]')): ver = k.p[1].v if not re.search(r'^/|:[0-9]+|\\|://', ver): comps = ver.split('.') @@ -172,7 +171,7 @@ async def detect(self) -> None: info1=f'{ver} ({p})', )) - for fn, blob in self._context.store().db.execute('select path, blob from files where path like :pat', dict(pat='root/assets/%.js')): + for fn, blob in q.file_enum('root/assets/%.js'): f = io.StringIO(blob.decode('utf-8', errors='ignore')) for l in f: for m in re.finditer(r'[0-9]+\.[0-9]+|(19|20)[0-9]{2}[ /-]', l): @@ -252,18 +251,18 @@ async def detect(self) -> None: with files('trueseeing.libs').joinpath('tlds.txt').open('r', encoding='utf-8') as f: self._re_tlds = re.compile('^(?:{})$'.format('|'.join(re.escape(l.strip()) for l in f if l and not l.startswith('#'))), flags=re.IGNORECASE) - with self._context.store() as store: - for cl in store.query().consts(InvocationPattern('const-string', r'://|^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+|^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?$')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - for match in self._analyzed(cl.p[1].v, qn): - for v in match['value']: - self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary=f'detected {match["type_"]}', info1=v, source=qn)) - for name, val in self._context.string_resources(): - for match in self._analyzed(val): - for v in match['value']: - self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary=f'detected {match["type_"]}', info1=v, source=f'R.string.{name}')) + q = self._context.store().query() + for cl in q.consts(InvocationPattern('const-string', r'://|^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+|^[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:[0-9]+)?$')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + for match in self._analyzed(cl.p[1].v, qn): + for v in match['value']: + self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary=f'detected {match["type_"]}', info1=v, source=qn)) + for name, val in self._context.string_resources(): + for match in self._analyzed(val): + for v in match['value']: + self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary=f'detected {match["type_"]}', info1=v, source=f'R.string.{name}')) class NativeMethodDetector(Detector): option = 'detect-native-method' @@ -274,20 +273,17 @@ class NativeMethodDetector(Detector): _detailed_description = None async def detect(self) -> None: - with self._context.store() as store: - for op in self._nativeish_methods(store.db): - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss, - summary=self._summary, - synopsis=self._synopsis, - source=store.query().qualname_of(op) - )) - - def _nativeish_methods(self, c: Any) -> Iterable[Op]: - for r in c.execute('select op_vecs.op as _0, t as _1, v as _2, op1 as _3, t1 as _4, v1 as _5, op2 as _6, t2 as _7, v2 as _8, op3 as _9, t3 as _10, v3 as _11, op4 as _12, t4 as _13, v4 as _14, op5 as _15, t5 as _16, v5 as _17, op6 as _18, t6 as _19, v6 as _20, op7 as _21, t7 as _22, v7 as _23, op8 as _24, t8 as _25, v8 as _26, op9 as _27, t9 as _28, v9 as _29 from ops_method join op_vecs on (method=ops_method.op and method=op_vecs.op) where v=:pat or v2=:pat or v3=:pat or v4=:pat or v5=:pat or v6=:pat or v7=:pat or v8=:pat or v9=:pat', dict(pat='native')): - yield Query._op_from_row(r) + store = self._context.store() + q = store.query() + for op in q.methods_with_modifier('native'): + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss, + summary=self._summary, + synopsis=self._synopsis, + source=q.qualname_of(op) + )) class NativeArchDetector(Detector): option = 'detect-native-arch' @@ -297,7 +293,7 @@ class NativeArchDetector(Detector): _synopsis = "The application has native codes for some architectures." async def detect(self) -> None: - for d, in self._context.store().db.execute('select path from files where path like :pat', dict(pat='root/lib/%')): + for d in self._context.store().query().file_find('root/lib/%'): if re.search(r'arm|x86|mips', d): arch = d.split('/')[2] self._raise_issue(Issue( diff --git a/trueseeing/signature/privacy.py b/trueseeing/signature/privacy.py index f1adc23..173094e 100644 --- a/trueseeing/signature/privacy.py +++ b/trueseeing/signature/privacy.py @@ -59,21 +59,22 @@ def analyzed(self, store: Store, op: Op) -> Optional[str]: return None async def detect(self) -> None: - with self._context.store() as store: - for op in store.query().invocations(InvocationPattern('invoke-', r'Landroid/provider/Settings\$Secure;->getString\(Landroid/content/ContentResolver;Ljava/lang/String;\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getDeviceId\(\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getSubscriberId\(\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getLine1Number\(\)Ljava/lang/String;|Landroid/bluetooth/BluetoothAdapter;->getAddress\(\)Ljava/lang/String;|Landroid/net/wifi/WifiInfo;->getMacAddress\(\)Ljava/lang/String;|Ljava/net/NetworkInterface;->getHardwareAddress\(\)')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - val_type = self.analyzed(store, op) - if val_type is not None: - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - info1=f'getting {val_type}', - source=store.query().qualname_of(op) - )) + store = self._context.store() + q = store.query() + for op in q.invocations(InvocationPattern('invoke-', r'Landroid/provider/Settings\$Secure;->getString\(Landroid/content/ContentResolver;Ljava/lang/String;\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getDeviceId\(\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getSubscriberId\(\)Ljava/lang/String;|Landroid/telephony/TelephonyManager;->getLine1Number\(\)Ljava/lang/String;|Landroid/bluetooth/BluetoothAdapter;->getAddress\(\)Ljava/lang/String;|Landroid/net/wifi/WifiInfo;->getMacAddress\(\)Ljava/lang/String;|Ljava/net/NetworkInterface;->getHardwareAddress\(\)')): + qn = q.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + val_type = self.analyzed(store, op) + if val_type is not None: + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + info1=f'getting {val_type}', + source=q.qualname_of(op) + )) class PrivacySMSDetector(Detector): option = 'privacy-sms' @@ -82,46 +83,47 @@ class PrivacySMSDetector(Detector): _summary = 'privacy concerns' async def detect(self) -> None: - with self._context.store() as store: - for op in store.query().invocations(InvocationPattern('invoke-', r'Landroid/net/Uri;->parse\(Ljava/lang/String;\)Landroid/net/Uri;')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - try: - if DataFlows.solved_constant_data_in_invocation(store, op, 0).startswith('content://sms/'): - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - info1='accessing SMS', - source=store.query().qualname_of(op) - )) - except DataFlows.NoSuchValueError: - pass + store = self._context.store() + q = store.query() + for op in q.invocations(InvocationPattern('invoke-', r'Landroid/net/Uri;->parse\(Ljava/lang/String;\)Landroid/net/Uri;')): + qn = q.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + try: + if DataFlows.solved_constant_data_in_invocation(store, op, 0).startswith('content://sms/'): + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + info1='accessing SMS', + source=q.qualname_of(op) + )) + except DataFlows.NoSuchValueError: + pass - for op in store.query().invocations(InvocationPattern('invoke-', r'Landroid/telephony/SmsManager;->send')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - info1='sending SMS', - source=store.query().qualname_of(op) - )) + for op in q.invocations(InvocationPattern('invoke-', r'Landroid/telephony/SmsManager;->send')): + qn = q.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + info1='sending SMS', + source=q.qualname_of(op) + )) - for op in store.query().invocations(InvocationPattern('invoke-', r'Landroid/telephony/SmsMessage;->createFromPdu\(')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss, - summary=self._summary, - info1='intercepting incoming SMS', - source=store.query().qualname_of(op) - )) + for op in q.invocations(InvocationPattern('invoke-', r'Landroid/telephony/SmsMessage;->createFromPdu\(')): + qn = q.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss, + summary=self._summary, + info1='intercepting incoming SMS', + source=q.qualname_of(op) + )) diff --git a/trueseeing/signature/security.py b/trueseeing/signature/security.py index 48cc65d..3ba5dff 100644 --- a/trueseeing/signature/security.py +++ b/trueseeing/signature/security.py @@ -40,24 +40,24 @@ class SecurityFilePermissionDetector(Detector): _summary = 'insecure file permission' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-virtual', r'Landroid/content/Context;->openFileOutput\(Ljava/lang/String;I\)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = int(DataFlows.solved_constant_data_in_invocation(store, cl, 1), 16) - if target_val & 3: - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - info1={1: 'MODE_WORLD_READABLE', 2: 'MODE_WORLD_WRITEABLE'}[target_val], - source=store.query().qualname_of(cl) - )) - except (DataFlows.NoSuchValueError): - pass + store = self._context.store() + for cl in store.query().invocations(InvocationPattern('invoke-virtual', r'Landroid/content/Context;->openFileOutput\(Ljava/lang/String;I\)')): + qn = store.query().qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = int(DataFlows.solved_constant_data_in_invocation(store, cl, 1), 16) + if target_val & 3: + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + info1={1: 'MODE_WORLD_READABLE', 2: 'MODE_WORLD_WRITEABLE'}[target_val], + source=store.query().qualname_of(cl) + )) + except (DataFlows.NoSuchValueError): + pass class SecurityTlsInterceptionDetector(Detector): @@ -108,43 +108,42 @@ async def detect(self) -> None: )) def _do_detect_plain_pins_x509(self) -> Set[str]: - with self._context.store() as store: - pins: Set[str] = set() - q = store.query() - for m in store.query().methods_in_class('checkServerTrusted', 'X509TrustManager'): - if any(q.matches_in_method(m, InvocationPattern('verify', ''))): - classname = q.class_name_of(q.class_of_method(m)) - if classname: - pins.add(classname) - if any(q.matches_in_method(m, InvocationPattern('throw', ''))): - classname = q.class_name_of(q.class_of_method(m)) - if classname: - pins.add(classname) - - if pins: - # XXX crude detection - custom_sslcontext_detected = False - for cl in self._context.store().query().invocations(InvocationPattern('invoke-virtual', 'Ljavax/net/ssl/SSLContext;->init')): - custom_sslcontext_detected = True - pins = DataFlows.solved_typeset_in_invocation(store, cl, 1) & pins - - if not custom_sslcontext_detected: - return set() - else: - return pins + pins: Set[str] = set() + store = self._context.store() + q = store.query() + for m in q.methods_in_class('checkServerTrusted', 'X509TrustManager'): + if any(q.matches_in_method(m, InvocationPattern('verify', ''))): + classname = q.class_name_of(q.class_of_method(m)) + if classname: + pins.add(classname) + if any(q.matches_in_method(m, InvocationPattern('throw', ''))): + classname = q.class_name_of(q.class_of_method(m)) + if classname: + pins.add(classname) + + if pins: + # XXX crude detection + custom_sslcontext_detected = False + for cl in q.invocations(InvocationPattern('invoke-virtual', 'Ljavax/net/ssl/SSLContext;->init')): + custom_sslcontext_detected = True + pins = DataFlows.solved_typeset_in_invocation(store, cl, 1) & pins + + if not custom_sslcontext_detected: + return set() else: return pins + else: + return pins def _do_detect_plain_pins_hostnameverifier(self) -> Set[str]: - with self._context.store() as store: - pins: Set[str] = set() - q = store.query() - for m in itertools.chain(store.query().methods_in_class('verify(Ljava/lang/String;Ljavax/net/ssl/SSLSession;)Z', 'HostnameVerifier')): - if any(q.matches_in_method(m, InvocationPattern('invoke', 'contains|equals|verify|Ljavax/net/ssl/SSLSession;->getPeerCertificates'))): - classname = q.class_name_of(q.class_of_method(m)) - if classname: - pins.add(classname) - return pins + pins: Set[str] = set() + q = self._context.store().query() + for m in itertools.chain(q.methods_in_class('verify(Ljava/lang/String;Ljavax/net/ssl/SSLSession;)Z', 'HostnameVerifier')): + if any(q.matches_in_method(m, InvocationPattern('invoke', 'contains|equals|verify|Ljavax/net/ssl/SSLSession;->getPeerCertificates'))): + classname = q.class_name_of(q.class_of_method(m)) + if classname: + pins.add(classname) + return pins class LayoutSizeGuesser: @@ -232,53 +231,55 @@ class SecurityTamperableWebViewDetector(Detector): async def detect(self) -> None: import lxml.etree as ET from functools import reduce - with self._context.store() as store: - targets = {'WebView','XWalkView','GeckoView'} - - more = True - while more: - more = False - for cl in store.query().related_classes('|'.join(targets)): - name = store.query().class_name_of(cl) - if name is not None and name not in targets: - targets.add(name) - more = True - - for fn, blob in self._context.store().db.execute('select path, blob from files where path like :pat', dict(pat='%/res/%layout%.xml')): - r = ET.fromstring(blob, parser=ET.XMLParser(recover=True)) - for t in reduce(lambda x,y: x+y, (r.xpath('//{}'.format(self._context.class_name_of_dalvik_class_type(c).replace('$', '_'))) for c in targets)): - size = LayoutSizeGuesser().guessed_size(t, fn) - if size > 0.5: - try: - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss1, - summary=self._summary1, - info1='{0} (score: {1:.02f})'.format(t.attrib[f'{self._xmlns_android}id'], size), - source=self._context.source_name_of_disassembled_resource(fn) - )) - except KeyError as e: - ui.warn(f'SecurityTamperableWebViewDetector.do_detect: missing key {e}') - # XXX: crude detection - for op in store.query().invocations(InvocationPattern('invoke-', ';->loadUrl')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - try: - v = DataFlows.solved_constant_data_in_invocation(store, op, 0) - if v.startswith('http://'): + store = self._context.store() + q = store.query() + targets = {'WebView','XWalkView','GeckoView'} + + more = True + while more: + more = False + for cl in q.related_classes('|'.join(targets)): + name = q.class_name_of(cl) + if name is not None and name not in targets: + targets.add(name) + more = True + + for fn, blob in q.file_enum('%/res/%layout%.xml'): + r = ET.fromstring(blob, parser=ET.XMLParser(recover=True)) + for t in reduce(lambda x,y: x+y, (r.xpath('//{}'.format(self._context.class_name_of_dalvik_class_type(c).replace('$', '_'))) for c in targets)): + size = LayoutSizeGuesser().guessed_size(t, fn) + if size > 0.5: + try: self._raise_issue(Issue( detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss2, - summary=self._summary2, - info1=v, - source=store.query().qualname_of(op) + confidence='tentative', + cvss3_vector=self._cvss1, + summary=self._summary1, + info1='{0} (score: {1:.02f})'.format(t.attrib[f'{self._xmlns_android}id'], size), + source=self._context.source_name_of_disassembled_resource(fn) )) - except DataFlows.NoSuchValueError: - pass + except KeyError as e: + ui.warn(f'SecurityTamperableWebViewDetector.do_detect: missing key {e}') + + # XXX: crude detection + for op in q.invocations(InvocationPattern('invoke-', ';->loadUrl')): + qn = q.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + try: + v = DataFlows.solved_constant_data_in_invocation(store, op, 0) + if v.startswith('http://'): + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss2, + summary=self._summary2, + info1=v, + source=q.qualname_of(op) + )) + except DataFlows.NoSuchValueError: + pass class SecurityInsecureWebViewDetector(Detector): @@ -306,124 +307,127 @@ def _first(cls, xs: Iterable[T], default: Optional[T] = None) -> Optional[T]: return default async def detect(self) -> None: - with self._context.store() as store: - targets = set() - seeds = {'WebView','XWalkView','GeckoView'} - - more = True - while more: - more = False - for cl in store.query().related_classes('|'.join(seeds)): - name = store.query().class_name_of(cl) - if name not in targets: - targets.add(name) - more = True - for seed in seeds: - targets.add(f'L.*{seed};') - - # XXX: Crude detection - # https://developer.android.com/reference/android/webkit/WebView.html#addJavascriptInterface(java.lang.Object,%2520java.lang.String) - if self._context.get_min_sdk_version() <= 16: - for p in store.query().invocations(InvocationPattern('invoke-virtual', 'Landroid/webkit/WebSettings;->setJavaScriptEnabled')): - qn = store.query().qualname_of(p) - if self._context.is_qualname_excluded(qn): - continue - try: - if DataFlows.solved_constant_data_in_invocation(store, p, 0): - for target in targets: - for q in store.query().invocations_in_class(p, InvocationPattern('invoke-virtual', f'{target}->addJavascriptInterface')): - try: - if DataFlows.solved_constant_data_in_invocation(store, q, 0): - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss, - summary=self._summary1, - source=store.query().qualname_of(q) - )) - except (DataFlows.NoSuchValueError): + store = self._context.store() + query = store.query() + + targets = set() + seeds = {'WebView','XWalkView','GeckoView'} + + more = True + while more: + more = False + for cl in store.query().related_classes('|'.join(seeds)): + name = store.query().class_name_of(cl) + if name not in targets: + targets.add(name) + more = True + for seed in seeds: + targets.add(f'L.*{seed};') + + # XXX: Crude detection + # https://developer.android.com/reference/android/webkit/WebView.html#addJavascriptInterface(java.lang.Object,%2520java.lang.String) + if self._context.get_min_sdk_version() <= 16: + for p in query.invocations(InvocationPattern('invoke-virtual', 'Landroid/webkit/WebSettings;->setJavaScriptEnabled')): + qn = query.qualname_of(p) + if self._context.is_qualname_excluded(qn): + continue + try: + if DataFlows.solved_constant_data_in_invocation(store, p, 0): + for target in targets: + for q in query.invocations_in_class(p, InvocationPattern('invoke-virtual', f'{target}->addJavascriptInterface')): + try: + if DataFlows.solved_constant_data_in_invocation(store, q, 0): self._raise_issue(Issue( detector_id=self.option, - confidence='tentative', + confidence='firm', cvss3_vector=self._cvss, summary=self._summary1, - source=store.query().qualname_of(q) + source=query.qualname_of(q) )) - except (DataFlows.NoSuchValueError): - pass - - # https://developer.android.com/reference/android/webkit/WebSettings#setMixedContentMode(int) - if self._context.get_min_sdk_version() >= 21: - for q in store.query().invocations(InvocationPattern('invoke-virtual', 'Landroid/webkit/WebSettings;->setMixedContentMode')): - qn = store.query().qualname_of(q) - if self._context.is_qualname_excluded(qn): - continue - try: - val = int(DataFlows.solved_constant_data_in_invocation(store, q, 0), 16) - if val == 0: + except (DataFlows.NoSuchValueError): + self._raise_issue(Issue( + detector_id=self.option, + confidence='tentative', + cvss3_vector=self._cvss, + summary=self._summary1, + source=query.qualname_of(q) + )) + except (DataFlows.NoSuchValueError): + pass + + # https://developer.android.com/reference/android/webkit/WebSettings#setMixedContentMode(int) + if self._context.get_min_sdk_version() >= 21: + for q in query.invocations(InvocationPattern('invoke-virtual', 'Landroid/webkit/WebSettings;->setMixedContentMode')): + qn = query.qualname_of(q) + if self._context.is_qualname_excluded(qn): + continue + try: + val = int(DataFlows.solved_constant_data_in_invocation(store, q, 0), 16) + if val == 0: + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss2, + summary=self._summary2, + info1='MIXED_CONTENT_ALWAYS_ALLOW', + source=query.qualname_of(q))) + elif val == 2: + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss2b, + summary=self._summary2b, + info1='MIXED_CONTENT_COMPATIBILITY_MODE', + source=query.qualname_of(q))) + except (DataFlows.NoSuchValueError): + pass + else: + for target in targets: + for q in query.invocations(InvocationPattern('invoke-virtual', f'{target}->loadUrl')): + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss, + summary=self._summary2, + info1='mixed mode always enabled in API < 21', + source=query.qualname_of(q) + )) + + for op in query.invocations(InvocationPattern('invoke-', ';->loadUrl')): + qn = query.qualname_of(op) + if self._context.is_qualname_excluded(qn): + continue + try: + v = DataFlows.solved_constant_data_in_invocation(store, op, 0) + if v.startswith('file:///android_asset/'): + path = v.replace('file:///android_asset/', 'assets/') + blob = query.file_get(f'root/{path}') + if blob is not None: + content = blob.decode('utf-8', errors='ignore') + m = re.search('', content, flags=re.IGNORECASE) + csp: Optional[str] = None if m is None else m.group(1) + if csp is None or any([(x in csp.lower()) for x in ('unsafe', 'http:')]): self._raise_issue(Issue( detector_id=self.option, confidence='firm', - cvss3_vector=self._cvss2, - summary=self._summary2, - info1='MIXED_CONTENT_ALWAYS_ALLOW', - source=store.query().qualname_of(q))) - elif val == 2: + cvss3_vector=self._cvss3, + summary=self._summary3, + info1=path, + info2='default' if csp is None else csp, + source=query.qualname_of(op) + )) + else: self._raise_issue(Issue( detector_id=self.option, confidence='firm', - cvss3_vector=self._cvss2b, - summary=self._summary2b, - info1='MIXED_CONTENT_COMPATIBILITY_MODE', - source=store.query().qualname_of(q))) - except (DataFlows.NoSuchValueError): - pass - else: - for target in targets: - for q in store.query().invocations(InvocationPattern('invoke-virtual', f'{target}->loadUrl')): - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss, - summary=self._summary2, - info1='mixed mode always enabled in API < 21', - source=store.query().qualname_of(q) - )) - - for op in store.query().invocations(InvocationPattern('invoke-', ';->loadUrl')): - qn = store.query().qualname_of(op) - if self._context.is_qualname_excluded(qn): - continue - try: - v = DataFlows.solved_constant_data_in_invocation(store, op, 0) - if v.startswith('file:///android_asset/'): - path = v.replace('file:///android_asset/', 'assets/') - for blob, in store.db.execute('select blob from files where path=:path', dict(path=f'root/{path}')): - content = blob.decode('utf-8', errors='ignore') - m = re.search('', content, flags=re.IGNORECASE) - csp: Optional[str] = None if m is None else m.group(1) - if csp is None or any([(x in csp.lower()) for x in ('unsafe', 'http:')]): - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss3, - summary=self._summary3, - info1=path, - info2='default' if csp is None else csp, - source=store.query().qualname_of(op) - )) - else: - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss4, - summary=self._summary4, - info1=path, - info2=csp, - source=store.query().qualname_of(op) - )) - except DataFlows.NoSuchValueError: - pass + cvss3_vector=self._cvss4, + summary=self._summary4, + info1=path, + info2=csp, + source=query.qualname_of(op) + )) + except DataFlows.NoSuchValueError: + pass class FormatStringDetector(Detector): option = 'security-format-string' @@ -437,92 +441,93 @@ def _analyzed(self, x: str) -> Iterable[Dict[str, Any]]: yield dict(confidence='firm', value=x) async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().consts(InvocationPattern('const-string', r'%s')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - for t in self._analyzed(cl.p[1].v): + q = self._context.store().query() + for cl in q.consts(InvocationPattern('const-string', r'%s')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + for t in self._analyzed(cl.p[1].v): + self._raise_issue(Issue( + detector_id=self.option, + confidence=t['confidence'], + cvss3_vector=self._cvss, + summary=self._summary, + info1=t['value'], + source=q.qualname_of(cl) + )) + for name, val in self._context.string_resources(): + for t in self._analyzed(val): + self._raise_issue(Issue( + detector_id=self.option, + confidence=t['confidence'], + cvss3_vector=self._cvss, + summary=self._summary, + info1=t['value'], + source=f'R.string.{name}' + )) + +class LogDetector(Detector): + option = 'security-log' + description = 'Detects logging activities' + _summary = 'detected logging' + _cvss = 'CVSS:3.0/AV:P/AC:H/PR:N/UI:N/S:U/C:L/I:N/A:N/' + + async def detect(self) -> None: + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-', r'L.*->([dwie]|debug|error|exception|warning|info|notice|wtf)\(Ljava/lang/String;Ljava/lang/String;.*?Ljava/lang/(Throwable|.*?Exception);|L.*;->print(ln)?\(Ljava/lang/String;|LException;->printStackTrace\(')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + if 'print' not in cl.p[1].v: + try: self._raise_issue(Issue( detector_id=self.option, - confidence=t['confidence'], + confidence='tentative', cvss3_vector=self._cvss, summary=self._summary, - info1=t['value'], - source=store.query().qualname_of(cl) + info1=cl.p[1].v, + info2=DataFlows.solved_constant_data_in_invocation(store, cl, 1), + source=q.qualname_of(cl) )) - for name, val in self._context.string_resources(): - for t in self._analyzed(val): + except (DataFlows.NoSuchValueError): self._raise_issue(Issue( detector_id=self.option, - confidence=t['confidence'], + confidence='tentative', cvss3_vector=self._cvss, summary=self._summary, - info1=t['value'], - source=f'R.string.{name}' + info1=cl.p[1].v, + source=q.qualname_of(cl) )) - -class LogDetector(Detector): - option = 'security-log' - description = 'Detects logging activities' - _summary = 'detected logging' - _cvss = 'CVSS:3.0/AV:P/AC:H/PR:N/UI:N/S:U/C:L/I:N/A:N/' - - async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-', r'L.*->([dwie]|debug|error|exception|warning|info|notice|wtf)\(Ljava/lang/String;Ljava/lang/String;.*?Ljava/lang/(Throwable|.*?Exception);|L.*;->print(ln)?\(Ljava/lang/String;|LException;->printStackTrace\(')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - if 'print' not in cl.p[1].v: - try: - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss, - summary=self._summary, - info1=cl.p[1].v, - info2=DataFlows.solved_constant_data_in_invocation(store, cl, 1), - source=store.query().qualname_of(cl) - )) - except (DataFlows.NoSuchValueError): - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss, - summary=self._summary, - info1=cl.p[1].v, - source=store.query().qualname_of(cl) - )) - elif 'Exception;->' not in cl.p[1].v: - try: - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss, - summary=self._summary, - info1=cl.p[1].v, - info2=DataFlows.solved_constant_data_in_invocation(store, cl, 0), - source=store.query().qualname_of(cl) - )) - except (DataFlows.NoSuchValueError): - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss, - summary=self._summary, - info1=cl.p[1].v, - source=store.query().qualname_of(cl) - )) - else: + elif 'Exception;->' not in cl.p[1].v: + try: self._raise_issue(Issue( detector_id=self.option, confidence='tentative', cvss3_vector=self._cvss, summary=self._summary, info1=cl.p[1].v, - source=store.query().qualname_of(cl) + info2=DataFlows.solved_constant_data_in_invocation(store, cl, 0), + source=q.qualname_of(cl) + )) + except (DataFlows.NoSuchValueError): + self._raise_issue(Issue( + detector_id=self.option, + confidence='tentative', + cvss3_vector=self._cvss, + summary=self._summary, + info1=cl.p[1].v, + source=q.qualname_of(cl) )) + else: + self._raise_issue(Issue( + detector_id=self.option, + confidence='tentative', + cvss3_vector=self._cvss, + summary=self._summary, + info1=cl.p[1].v, + source=q.qualname_of(cl) + )) class ADBProbeDetector(Detector): option = 'security-adb-detect' @@ -532,21 +537,22 @@ class ADBProbeDetector(Detector): _synopsis = 'The application is probing for USB debugging (adbd.)' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-', r'^Landroid/provider/Settings\$(Global|Secure);->getInt\(')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - for found in DataFlows.solved_possible_constant_data_in_invocation(store, cl, 1): - if found == 'adb_enabled': - self._raise_issue(Issue( - detector_id=self.option, - confidence='firm', - cvss3_vector=self._cvss, - summary=self._summary, - source=store.query().qualname_of(cl), - synopsis=self._synopsis, - )) + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-', r'^Landroid/provider/Settings\$(Global|Secure);->getInt\(')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + for found in DataFlows.solved_possible_constant_data_in_invocation(store, cl, 1): + if found == 'adb_enabled': + self._raise_issue(Issue( + detector_id=self.option, + confidence='firm', + cvss3_vector=self._cvss, + summary=self._summary, + source=q.qualname_of(cl), + synopsis=self._synopsis, + )) class ClientXSSJQDetector(Detector): option = 'security-cxss-jq' @@ -556,7 +562,7 @@ class ClientXSSJQDetector(Detector): _synopsis = "The application pours literal HTML in JQuery context." async def detect(self) -> None: - for fn, blob in self._context.store().db.execute('select path, blob from files where path like :pat', dict(pat='root/assets/%.js')): + for fn, blob in self._context.store().query().file_enum(pat='root/assets/%.js'): f = io.StringIO(blob.decode('utf-8', errors='ignore')) for l in f: for m in re.finditer(r'\.html\(', l): @@ -580,57 +586,58 @@ class SecurityFileWriteDetector(Detector): _synopsis2 = 'The application opens files for writing.' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-virtual', r'Landroid/content/Context;->openFileOutput\(Ljava/lang/String;I\)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) - except DataFlows.NoSuchValueError: - target_val = '(unknown name)' + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-virtual', r'Landroid/content/Context;->openFileOutput\(Ljava/lang/String;I\)')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) + except DataFlows.NoSuchValueError: + target_val = '(unknown name)' - if re.search(r'debug|log|info|report|screen|err|tomb|drop', target_val): - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss1, - summary=self._summary1, - synopsis=self._synopsis1, - info1=target_val, - source=store.query().qualname_of(cl) - )) - else: - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss2, - summary=self._summary2, - synopsis=self._synopsis2, - info1=target_val, - source=store.query().qualname_of(cl) - )) + if re.search(r'debug|log|info|report|screen|err|tomb|drop', target_val): + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss1, + summary=self._summary1, + synopsis=self._synopsis1, + info1=target_val, + source=q.qualname_of(cl) + )) + else: + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss2, + summary=self._summary2, + synopsis=self._synopsis2, + info1=target_val, + source=q.qualname_of(cl) + )) - for cl in store.query().invocations(InvocationPattern('invoke-direct', r'java/io/File(Writer|OutputStream)?;->\(Ljava/lang/String;\)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) + for cl in q.invocations(InvocationPattern('invoke-direct', r'java/io/File(Writer|OutputStream)?;->\(Ljava/lang/String;\)')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) - if re.search(r'debug|log|info|report|screen|err|tomb|drop', target_val): - if not re.search(r'^/proc/|^/sys/', target_val): - self._raise_issue(Issue( - detector_id=self.option, - confidence='tentative', - cvss3_vector=self._cvss1, - summary=self._summary1, - synopsis=self._synopsis1, - info1=target_val, - source=store.query().qualname_of(cl) - )) - except DataFlows.NoSuchValueError: - target_val = '(unknown name)' + if re.search(r'debug|log|info|report|screen|err|tomb|drop', target_val): + if not re.search(r'^/proc/|^/sys/', target_val): + self._raise_issue(Issue( + detector_id=self.option, + confidence='tentative', + cvss3_vector=self._cvss1, + summary=self._summary1, + synopsis=self._synopsis1, + info1=target_val, + source=q.qualname_of(cl) + )) + except DataFlows.NoSuchValueError: + target_val = '(unknown name)' class SecurityInsecureRootedDetector(Detector): option = 'security-insecure-rooted' @@ -640,35 +647,37 @@ class SecurityInsecureRootedDetector(Detector): _pat = r'^/[{}$%a-zA-Z0-9_-]+(/[{}$%a-zA-Z0-9_-]+)+' async def detect(self) -> None: - with self._context.store() as store: - found: Set[str] = set() - attestations: Set[str] = set() + store = self._context.store() + q = store.query() - for cl in store.query().invocations(InvocationPattern('invoke-', r'Lcom/google/android/gms/safetynet/SafetyNetClient;->attest\(\[BLjava/lang/String;\)')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - # XXX: crude detection - verdict_accesses = list(store.query().consts_in_class(cl, InvocationPattern('const-string', r'ctsProfileMatch|basicIntegrity'))) - if verdict_accesses and qn is not None: - attestations.add(qn) + found: Set[str] = set() + attestations: Set[str] = set() - for cl in store.query().consts(InvocationPattern('const-string', self._pat)): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - found = found.union([m.group(0) for m in re.finditer(self._pat, cl.p[1].v)]) - for name, val in self._context.string_resources(): - found = found.union([m.group(0) for m in re.finditer(self._pat, val)]) - - path_based_detection_attempt: Set[str] = set() - for s in found: - if re.search(r'Sup|su|xbin|sbin|root', s): - path_based_detection_attempt.add(s) - if path_based_detection_attempt and not attestations: - self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary='manual root detections without remote attestations', info1=','.join(path_based_detection_attempt))) - elif attestations and not path_based_detection_attempt: - self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary='remote attestations without manual root detections', info1=','.join(attestations))) + for cl in q.invocations(InvocationPattern('invoke-', r'Lcom/google/android/gms/safetynet/SafetyNetClient;->attest\(\[BLjava/lang/String;\)')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + # XXX: crude detection + verdict_accesses = list(q.consts_in_class(cl, InvocationPattern('const-string', r'ctsProfileMatch|basicIntegrity'))) + if verdict_accesses and qn is not None: + attestations.add(qn) + + for cl in q.consts(InvocationPattern('const-string', self._pat)): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + found = found.union([m.group(0) for m in re.finditer(self._pat, cl.p[1].v)]) + for name, val in self._context.string_resources(): + found = found.union([m.group(0) for m in re.finditer(self._pat, val)]) + + path_based_detection_attempt: Set[str] = set() + for s in found: + if re.search(r'Sup|su|xbin|sbin|root', s): + path_based_detection_attempt.add(s) + if path_based_detection_attempt and not attestations: + self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary='manual root detections without remote attestations', info1=','.join(path_based_detection_attempt))) + elif attestations and not path_based_detection_attempt: + self._raise_issue(Issue(detector_id=self.option, confidence='firm', cvss3_vector=self._cvss, summary='remote attestations without manual root detections', info1=','.join(attestations))) class SecuritySharedPreferencesDetector(Detector): option = 'security-sharedpref' @@ -678,63 +687,64 @@ class SecuritySharedPreferencesDetector(Detector): _synopsis = 'The application is using SharedPreferences. This is purely informational; Using the subsystem alone does not constitute a security issue.' async def detect(self) -> None: - with self._context.store() as store: - for cl in store.query().invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences;->get(Boolean|Float|Int|String|StringSet)\(Ljava/lang/String;')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) - except DataFlows.NoSuchValueError: - target_val = '(unknown name)' - - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - synopsis=self._synopsis, - info1=target_val, - info2='read', - source=store.query().qualname_of(cl) - )) - - for cl in store.query().invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences\$Editor;->put(Boolean|Float|Int|String|StringSet)\(Ljava/lang/String;')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) - except DataFlows.NoSuchValueError: - target_val = '(unknown name)' - - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - synopsis=self._synopsis, - info1=target_val, - info2='write', - source=store.query().qualname_of(cl) - )) - - for cl in store.query().invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences/Editor;->remove\(Ljava/lang/String;')): - qn = store.query().qualname_of(cl) - if self._context.is_qualname_excluded(qn): - continue - try: - target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) - except DataFlows.NoSuchValueError: - target_val = '(unknown name)' - - self._raise_issue(Issue( - detector_id=self.option, - confidence='certain', - cvss3_vector=self._cvss, - summary=self._summary, - synopsis=self._synopsis, - info1=target_val, - info2='delete', - source=store.query().qualname_of(cl) - )) + store = self._context.store() + q = store.query() + for cl in q.invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences;->get(Boolean|Float|Int|String|StringSet)\(Ljava/lang/String;')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) + except DataFlows.NoSuchValueError: + target_val = '(unknown name)' + + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + synopsis=self._synopsis, + info1=target_val, + info2='read', + source=q.qualname_of(cl) + )) + + for cl in q.invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences\$Editor;->put(Boolean|Float|Int|String|StringSet)\(Ljava/lang/String;')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) + except DataFlows.NoSuchValueError: + target_val = '(unknown name)' + + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + synopsis=self._synopsis, + info1=target_val, + info2='write', + source=q.qualname_of(cl) + )) + + for cl in q.invocations(InvocationPattern('invoke-interface', r'Landroid/content/SharedPreferences/Editor;->remove\(Ljava/lang/String;')): + qn = q.qualname_of(cl) + if self._context.is_qualname_excluded(qn): + continue + try: + target_val = DataFlows.solved_constant_data_in_invocation(store, cl, 0) + except DataFlows.NoSuchValueError: + target_val = '(unknown name)' + + self._raise_issue(Issue( + detector_id=self.option, + confidence='certain', + cvss3_vector=self._cvss, + summary=self._summary, + synopsis=self._synopsis, + info1=target_val, + info2='delete', + source=q.qualname_of(cl) + ))