Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pulling split apks, Fixes #2271 #2446

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions mobsf/DynamicAnalyzer/views/android/dynamic_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,19 @@ def trigger_static_analysis(request, checksum):
err = 'Cannot connect to Android Runtime'
return print_n_send_error_response(request, err)
env = Environment(identifier)
apk_file = env.get_apk(checksum, package)
if not apk_file:
scan_type = env.get_apk(checksum, package)
if not scan_type:
err = 'Failed to download APK file'
return print_n_send_error_response(request, err)
file_name = f'{package}.apk'
if scan_type == 'apks':
file_name = f'{file_name}s'
data = {
'analyzer': 'static_analyzer',
'status': 'success',
'hash': checksum,
'scan_type': 'apk',
'file_name': f'{package}.apk',
'scan_type': scan_type,
'file_name': file_name,
}
add_to_recent_scan(data)
return HttpResponseRedirect(f'/static_analyzer/{checksum}/')
Expand Down
83 changes: 62 additions & 21 deletions mobsf/DynamicAnalyzer/views/android/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import threading
import time
from pathlib import Path
from base64 import b64encode
from hashlib import md5

Expand Down Expand Up @@ -436,31 +437,71 @@ def get_device_packages(self):
device_packages[md5] = (pkg, apk)
return device_packages

def get_apk(self, checksum, package):
"""Download APK from device."""
try:
out_dir = os.path.join(settings.UPLD_DIR, checksum + '/')
if not os.path.exists(out_dir):
os.makedirs(out_dir)
out_file = os.path.join(out_dir, f'{checksum}.apk')
if is_file_exists(out_file):
return out_file
def download_apk_packages(self, pkg_path, out_file):
"""Download APK package(s)."""
with tempfile.TemporaryDirectory() as temp_dir:
# Download APK package(s)
# Can be single or multiple packages
out = self.adb_command([
'pm',
'path',
package], True)
out = out.decode('utf-8').rstrip()
path = out.split('package:', 1)[1].strip()
logger.info('Downloading APK')
self.adb_command([
'pull',
path,
out_file,
pkg_path.as_posix(),
temp_dir,
])
if is_file_exists(out_file):
return out_file
fmt = out.decode('utf-8').strip()
logger.info('ADB Pull Output: %s', fmt)
# Filter for APK files in the directory
apk_files = []
for f in Path(temp_dir).glob('*.apk'):
if f.is_file():
apk_files.append(f)
# Check if there is exactly one APK file
if len(apk_files) == 1:
shutil.move(apk_files[0], out_file)
Dismissed Show dismissed Hide dismissed
return 'apk'
else:
# If there are multiple APK files, zip them
shutil.make_archive(out_file, 'zip', root_dir=temp_dir, base_dir='.')
# Rename the zip file to APK
apks_file = out_file.with_suffix('.apk')
os.rename(out_file.as_posix() + '.zip', apks_file)
Dismissed Show dismissed Hide dismissed
Dismissed Show dismissed Hide dismissed
return 'apks'

def get_apk_packages(self, package):
"""Get all APK packages from device."""
out = self.adb_command([
'pm',
'path',
package], True)
return out.decode('utf-8').strip()

def get_apk_parent_directory(self, package):
"""Get parent directory of APK packages."""
package_out = self.get_apk_packages(package)
package_out = package_out.split()
if ('package:' in package_out[0]
and package_out[0].endswith('.apk')):
path = package_out[0].split('package:', 1)[1].strip()
return Path(path).parent
return False

def get_apk(self, checksum, package):
"""Download APK from device."""
try:
# Do not download if already exists
out_dir = Path(settings.UPLD_DIR) / checksum
out_dir.mkdir(parents=True, exist_ok=True)
Dismissed Show dismissed Hide dismissed
out_file = out_dir / f'{checksum}.apk'
if out_file.exists():
Dismissed Show dismissed Hide dismissed
return 'apk'
# Get APK package parent directory
pkg_path = self.get_apk_parent_directory(package)
if pkg_path:
# Download APK package(s)
logger.info('Downloading APK')
return self.download_apk_packages(pkg_path, out_file)
except Exception:
return False
logger.exception('Failed to download APK')
return False

def system_check(self, runtime):
"""Check if /system is writable."""
Expand Down
45 changes: 45 additions & 0 deletions mobsf/MalwareAnalyzer/views/android/behaviour_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf_8 -*-
"""Perform behaviour analysis."""
import logging
from pathlib import Path

from django.conf import settings

from mobsf.MobSF.utils import (
append_scan_status,
get_android_src_dir,
)
from mobsf.StaticAnalyzer.views.sast_engine import (
scan,
)

logger = logging.getLogger(__name__)


def analyze(checksum, app_dir, typ):
"""Perform behaviour analysis."""
try:
root = Path(settings.BASE_DIR) / 'MalwareAnalyzer' / 'views'
rules = root / 'android' / 'rules' / 'behaviour_rules.yaml'
app_dir = Path(app_dir)
src = get_android_src_dir(app_dir, typ)
skp = settings.SKIP_CLASS_PATH
msg = 'Android Behaviour Analysis Started'
logger.info(msg)
append_scan_status(checksum, msg)
# Behaviour Analysis
findings = scan(
checksum,
rules.as_posix(),
{'.java', '.kt'},
[src.as_posix() + '/'],
skp)
msg = 'Android Behaviour Analysis Completed'
logger.info(msg)
append_scan_status(checksum, msg)
return findings
except Exception as exp:
msg = 'Failed to perform behaviour analysis'
logger.exception(msg)
append_scan_status(checksum, msg, repr(exp))
return {}
144 changes: 0 additions & 144 deletions mobsf/MalwareAnalyzer/views/android/quark.py

This file was deleted.

Loading
Loading