Skip to content

Commit

Permalink
Added App Link Assetlinks Check
Browse files Browse the repository at this point in the history
  • Loading branch information
ajinabraham committed Dec 19, 2023
1 parent 2cf3103 commit 4abe0f0
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 139 deletions.
5 changes: 4 additions & 1 deletion mobsf/DynamicAnalyzer/tools/apk_patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ def download_frida_gadget(self, frida_arch, aarch, version):
if not url:
return None
logger.info('Downloading frida-gadget %s', fgadget)
with requests.get(url, stream=True) as r:
with requests.get(url,
stream=True,
proxies=proxies,
verify=verify) as r:
with LZMAFile(r.raw) as f:
with open(gadget_bin, 'wb') as flip:
copyfileobj(f, flip)
Expand Down
137 changes: 104 additions & 33 deletions mobsf/DynamicAnalyzer/views/ios/corellium_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

import requests

from mobsf.MobSF.utils import is_number
from mobsf.MobSF.utils import (
is_number,
upstream_proxy,
)


SUCCESS_RESP = (200, 204)
Expand All @@ -28,11 +31,14 @@ def __init__(self, project_id) -> None:
'Authorization': f'Bearer {self.api_key}',
}
self.project_id = project_id
self.proxies, self.verify = upstream_proxy('https')

def api_ready(self):
"""Check API Availability."""
try:
r = requests.get(f'{self.api}/ready')
r = requests.get(f'{self.api}/ready',
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return True
else:
Expand All @@ -50,7 +56,9 @@ def api_auth(self):
return False
r = requests.get(
f'{self.api}/projects',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in ERROR_RESP:
return False
return True
Expand All @@ -64,7 +72,9 @@ def get_projects(self):
ids = []
r = requests.get(
f'{self.api}/projects?ids_only=true',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
for i in r.json():
ids.append(i['id'])
Expand All @@ -77,7 +87,9 @@ def get_authorized_keys(self):
"""Get SSH public keys associated with a project."""
r = requests.get(
f'{self.api}/projects/{self.project_id}/keys',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
return False
Expand All @@ -96,7 +108,9 @@ def add_authorized_key(self, key):
r = requests.post(
f'{self.api}/projects/{self.project_id}/keys',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()['identifier']
return False
Expand All @@ -105,7 +119,9 @@ def delete_authorized_key(self, key_id):
"""Delete SSH public key from the Project."""
r = requests.delete(
f'{self.api}/projects/{self.project_id}/keys/{key_id}',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
return False
Expand All @@ -116,7 +132,9 @@ def get_instances(self):
instances = []
r = requests.get(
f'{self.api}/instances',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
for i in r.json():
if i['type'] == 'ios' and 'jailbroken' in i['patches']:
Expand All @@ -134,7 +152,9 @@ def create_ios_instance(self, flavor, version):
r = requests.post(
f'{self.api}/instances',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()['id']
return False
Expand All @@ -150,11 +170,14 @@ def __init__(self) -> None:
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
}
self.proxies, self.verify = upstream_proxy('https')

def get_models(self):
r = requests.get(
f'{self.api}/models',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
return False
Expand All @@ -172,7 +195,9 @@ def get_supported_os(self, model):
return False
r = requests.get(
f'{self.api}/models/{model}/software',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
elif r.status_code in ERROR_RESP:
Expand All @@ -191,14 +216,17 @@ def __init__(self, instance_id) -> None:
'Authorization': f'Bearer {self.api_key}',
}
self.instance_id = instance_id
self.proxies, self.verify = upstream_proxy('https')

def start_instance(self):
"""Start instance."""
data = {'paused': False}
r = requests.post(
f'{self.api}/instances/{self.instance_id}/start',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
elif r.status_code in ERROR_RESP:
Expand All @@ -211,7 +239,9 @@ def stop_instance(self):
r = requests.post(
f'{self.api}/instances/{self.instance_id}/stop',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
elif r.status_code in ERROR_RESP:
Expand All @@ -222,7 +252,9 @@ def unpause_instance(self):
"""Unpause instance."""
r = requests.post(
f'{self.api}/instances/{self.instance_id}/unpause',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
elif r.status_code in ERROR_RESP:
Expand All @@ -233,7 +265,9 @@ def reboot_instance(self):
"""Reboot instance."""
r = requests.post(
f'{self.api}/instances/{self.instance_id}/reboot',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
elif r.status_code in ERROR_RESP:
Expand All @@ -244,7 +278,9 @@ def remove_instance(self):
"""Remove instance."""
r = requests.delete(
f'{self.api}/instances/{self.instance_id}',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
elif r.status_code in ERROR_RESP:
Expand All @@ -255,7 +291,9 @@ def poll_instance(self):
"""Check instance status."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
return False
Expand All @@ -269,7 +307,9 @@ def screenshot(self):
(f'{self.api}/instances/{self.instance_id}'
'/screenshot.png?scale=1'),
headers=self.headers,
stream=True)
stream=True,
proxies=self.proxies,
verify=self.verify)
if r.status_code == 200:
return r.content
except Exception:
Expand All @@ -282,7 +322,9 @@ def start_network_capture(self):
"""Start network capture."""
r = requests.post(
f'{self.api}/instances/{self.instance_id}/sslsplit/enable',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
err = r.json()['error']
Expand All @@ -296,7 +338,9 @@ def stop_network_capture(self):
"""Stop network capture."""
r = requests.post(
f'{self.api}/instances/{self.instance_id}/sslsplit/disable',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
logger.error(
Expand All @@ -307,7 +351,9 @@ def download_network_capture(self):
"""Download network capture."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}/networkMonitor.pcap',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.content
logger.error(
Expand All @@ -318,7 +364,9 @@ def console_log(self):
"""Get Console Log."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}/consoleLog',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.content.decode('utf-8', 'ignore')
logger.error(
Expand All @@ -329,7 +377,9 @@ def get_ssh_connection_string(self):
"""Get SSH connection string."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}/quickConnectCommand',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.text
logger.error(
Expand Down Expand Up @@ -415,7 +465,9 @@ def device_input(self, event, x, y, max_x, max_y):
r = requests.post(
f'{self.api}/instances/{self.instance_id}/input',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return OK
logger.error(
Expand All @@ -434,12 +486,15 @@ def __init__(self, instance_id) -> None:
'Authorization': f'Bearer {self.api_key}',
}
self.instance_id = instance_id
self.proxies, self.verify = upstream_proxy('https')

def agent_ready(self):
"""Agent ready."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}/agent/v1/app/ready',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('Corellium Agent is Ready!')
return r.json()['ready']
Expand All @@ -452,7 +507,9 @@ def unlock_device(self):
"""Unlock iOS device."""
r = requests.post(
f'{self.api}/instances/{self.instance_id}/agent/v1/system/unlock',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('Device unlocked')
return OK
Expand All @@ -470,7 +527,9 @@ def upload_ipa(self, ipa_file):
(f'{self.api}/instances/{self.instance_id}'
f'/agent/v1/file/device/%2Ftmp%2Fapp.ipa'),
data=open(ipa_file, 'rb').read(),
headers=headers)
headers=headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('IPA uploaded to instance')
return OK
Expand All @@ -483,7 +542,9 @@ def install_ipa(self):
r = requests.post(
f'{self.api}/instances/{self.instance_id}/agent/v1/app/install',
headers=self.headers,
json=data)
json=data,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('App installed')
return OK
Expand All @@ -495,7 +556,9 @@ def run_app(self, bundle_id):
r = requests.post(
(f'{self.api}/instances/{self.instance_id}'
f'/agent/v1/app/apps/{bundle_id}/run'),
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('App Started')
return OK
Expand All @@ -507,7 +570,9 @@ def stop_app(self, bundle_id):
r = requests.post(
(f'{self.api}/instances/{self.instance_id}'
f'/agent/v1/app/apps/{bundle_id}/kill'),
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('App Killed')
return OK
Expand All @@ -519,7 +584,9 @@ def remove_app(self, bundle_id):
r = requests.post(
(f'{self.api}/instances/{self.instance_id}'
f'/agent/v1/app/apps/{bundle_id}/uninstall'),
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
logger.info('App Removed')
return OK
Expand All @@ -530,7 +597,9 @@ def list_apps(self):
"""List all apps installed."""
r = requests.get(
f'{self.api}/instances/{self.instance_id}/agent/v1/app/apps',
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
elif r.status_code in ERROR_RESP:
Expand All @@ -542,7 +611,9 @@ def get_icons(self, bundleids):
r = requests.get(
(f'{self.api}/instances/{self.instance_id}'
f'/agent/v1/app/icons?{bundleids}'),
headers=self.headers)
headers=self.headers,
proxies=self.proxies,
verify=self.verify)
if r.status_code in SUCCESS_RESP:
return r.json()
elif r.status_code in ERROR_RESP:
Expand Down
Loading

0 comments on commit 4abe0f0

Please sign in to comment.