Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using multithreading to improve code efficiency #2319

Merged
merged 6 commits into from
Jan 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions mobsf/StaticAnalyzer/views/android/manifest_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging

import requests
from concurrent.futures import ThreadPoolExecutor

from mobsf.MobSF.utils import (
upstream_proxy,
Expand Down Expand Up @@ -59,33 +60,47 @@
def assetlinks_check(act_name, well_knowns):
"""Well known assetlink check."""
findings = []
iden = 'sha256_cert_fingerprints'
try:
proxies, verify = upstream_proxy('https')
for host, w_url in well_knowns:

with ThreadPoolExecutor() as executor:
futures = []
for w_url, host in well_knowns.items():
logger.info(
'App Link Assetlinks Check - [%s] %s', act_name, host)
status = False
status_code = 0
try:
r = requests.get(w_url,
allow_redirects=True,
proxies=proxies,
verify=verify)
status_code = r.status_code
if (str(r.status_code).startswith('2')
and iden in str(r.json())):
status = True
except Exception:
pass
findings.append({
'url': w_url,
futures.append(
executor.submit(_check_url, host, w_url))
for future in futures:
findings.append(future.result())

return findings

def _check_url(host, w_url):
try:
iden = 'sha256_cert_fingerprints'
proxies, verify = upstream_proxy('https')
status = False
status_code = 0

r = requests.get(w_url,
allow_redirects=True,
proxies=proxies,
verify=verify,
timeout=5)

status_code = r.status_code
if (str(status_code).startswith('2') and iden in str(r.json())):
status = True

return {'url': w_url,
'host': host,
'status_code': status_code,
'status': status})
'status': status}

except Exception:
logger.exception('Well Known Assetlinks Check')
return findings
logger.exception(f'Well Known Assetlinks Check for URL: {w_url}')
return {'url': w_url,
'host': host,
'status_code': None,
'status': False}


def get_browsable_activities(node, ns):
Expand All @@ -99,7 +114,7 @@ def get_browsable_activities(node, ns):
paths = []
path_prefixs = []
path_patterns = []
well_known = []
well_known = {}
well_known_path = '/.well-known/assetlinks.json'
catg = node.getElementsByTagName('category')
for cat in catg:
Expand Down Expand Up @@ -129,12 +144,13 @@ def get_browsable_activities(node, ns):
path_patterns.append(path_pattern)
# Collect possible well-known paths
if scheme and scheme in ('http', 'https') and host:
host = host.replace('*.', '')
shost = f'{scheme}://{host}'
if port:
c_url = f'{scheme}://{host}:{port}{well_known_path}'
c_url = f'{shost}:{port}{well_known_path}'
else:
c_url = f'{scheme}://{host}{well_known_path}'
if (host, c_url) not in well_known:
well_known.append((host, c_url))
c_url = f'{shost}{well_known_path}'
well_known[c_url] = shost
schemes = [scheme + '://' for scheme in schemes]
browse_dic['schemes'] = schemes
browse_dic['mime_types'] = mime_types
Expand Down