Skip to content

Commit

Permalink
Adding install/uninstall command
Browse files Browse the repository at this point in the history
  • Loading branch information
alterakey committed Mar 28, 2024
1 parent 9e33729 commit 1eff6a3
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 1 deletion.
79 changes: 79 additions & 0 deletions trueseeing/app/cmd/android/engage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def get_commands(self) -> CommandMap:
'xco!':dict(e=self._engage_device_copyout),
'xci':dict(e=self._engage_device_copyin, n='xci[!] package [data.tar]', d='engage: copy-in package data'),
'xci!':dict(e=self._engage_device_copyin),
'xpd':dict(e=self._engage_deploy_package, n='xpd[!]', d='engage: deploy target package'),
'xpd!':dict(e=self._engage_deploy_package),
'xpu':dict(e=self._engage_undeploy_package, n='xpu', d='engage: remove target package'),
'xz':dict(e=self._engage_fuzz_intent, n='xz[!] "am-cmdline-template" [output.txt]', d='engage: fuzz intent'),
'xz!':dict(e=self._engage_fuzz_intent),
'xzr':dict(e=self._engage_fuzz_command, n='xzr[!] "cmdline-template" [output.txt]', d='engage: fuzz cmdline'),
Expand Down Expand Up @@ -761,6 +764,82 @@ async def _fuzz(pat: str, wordlist: Mapping[str, List[str]]) -> None:
async def _engage_fuzz_intent(self, args: deque[str]) -> None:
await self._engage_fuzz_command(args, am=True)

async def _engage_deploy_package(self, args: deque[str]) -> None:
cmd = args.popleft()

context: APKContext = await self._helper.get_context().require_type('apk').analyze(level=1)
apk = context.target

from time import time
from pubsub import pub
from trueseeing.core.ui import AndroidInstallProgressReporter
from trueseeing.core.android.device import AndroidDevice
from subprocess import CalledProcessError

dev = AndroidDevice()
at = time()
pkg = context.get_package_name()

ui.info(f'deploying package: {pkg}')

if cmd.endswith('!'):
try:
async for l in dev.invoke_adb_streaming(f'uninstall {pkg}', redir_stderr=True):
pub.sendMessage('progress.android.adb.update')
if b'success' in l.lower():
ui.warn('removing existing package')
except CalledProcessError as e:
ui.fatal('uninstall failed: {}'.format(e.stdout.decode().rstrip()))

with AndroidInstallProgressReporter().scoped():
pub.sendMessage('progress.android.adb.begin', what='installing ... ')
try:
async for l in dev.invoke_adb_streaming(f'install --no-streaming {apk}', redir_stderr=True):
pub.sendMessage('progress.android.adb.update')
if b'failure' in l.lower():
ui.stderr('')
if not cmd.endswith('!'):
ui.fatal('install failed; force (!) to replace ({})'.format(l.decode('UTF-8')))
else:
ui.fatal('install failed ({})'.format(l.decode('UTF-8')))

pub.sendMessage('progress.android.adb.done')
except CalledProcessError as e:
ui.fatal('install failed: {}'.format(e.stdout.decode().rstrip()))

ui.success('done ({t:.02f} sec){trailer}'.format(t=time() - at, trailer=' '*8))

async def _engage_undeploy_package(self, args: deque[str]) -> None:
_ = args.popleft()

context: APKContext = await self._helper.get_context().require_type('apk').analyze(level=1)

from time import time
from pubsub import pub
from trueseeing.core.android.device import AndroidDevice
from subprocess import CalledProcessError

dev = AndroidDevice()
at = time()
pkg = context.get_package_name()

ui.info(f'removing package: {pkg}')

try:
async for l in dev.invoke_adb_streaming(f'uninstall {pkg}', redir_stderr=True):
pub.sendMessage('progress.android.adb.update')
if b'failure' in l.lower():
import re
packages = await dev.invoke_adb('shell pm list packages', redir_stderr=True)
if not re.match(f'{pkg}$', packages, re.MULTILINE):
ui.fatal('package not found')
else:
ui.fatal('uninstall failed ({})'.format(l.decode()))
except CalledProcessError as e:
ui.fatal('uninstall failed: {}'.format(e.stdout.decode().rstrip()))

ui.success('done ({t:.02f} sec)'.format(t=time() - at))

def _generate_tempfilename_for_device(self, dir: Optional[str] = None) -> str:
import random
return (f'{dir}/' if dir is not None else '/data/local/tmp/') + ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=16))
Expand Down
3 changes: 2 additions & 1 deletion trueseeing/core/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ async def invoke_streaming(as_: str, redir_stderr: bool = False) -> AsyncIterato
from subprocess import PIPE, STDOUT
p = await asyncio.create_subprocess_shell(as_, stdout=PIPE, stderr=(STDOUT if redir_stderr else None))
try:
l: Optional[bytes] = None
if p.stdout is not None:
async for l in p.stdout:
yield l
_check_return_code(await p.wait(), as_, None, None)
_check_return_code(await p.wait(), as_, l, None)
finally:
if p.returncode is None:
try:
Expand Down
38 changes: 38 additions & 0 deletions trueseeing/core/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,44 @@ def scoped(self) -> Iterator[None]:
def _issue(self, issue: Issue) -> None:
self._CN.note(issue)

class AndroidInstallProgressReporter:
_bar: Optional[ProgressBar] = None

@contextmanager
def scoped(self) -> Iterator[None]:
submap = {
'progress.android.adb.begin':self._begin,
'progress.android.adb.update':self._update,
'progress.android.adb.done':self._done,
}
try:
for k, v in submap.items():
pub.subscribe(v, k)
yield None
finally:
for k, v in submap.items():
pub.unsubscribe(v, k)
pass

def _begin(self, what: str) -> None:
if ui.is_tty():
from progressbar import ProgressBar, RotatingMarker
self._bar = ProgressBar(widgets=[
ui.bullet('info'),
what + ' ',
RotatingMarker() # type:ignore[no-untyped-call]
])
else:
self._bar = None

def _update(self) -> None:
if self._bar is not None:
self._bar.next() # type:ignore[no-untyped-call]

def _done(self) -> None:
if self._bar:
self._bar.finish(end='\r') # type:ignore[no-untyped-call]

class OpFormatter:
def __init__(self, q: APKQuery, indent: int = 4) -> None:
self._q = q
Expand Down

0 comments on commit 1eff6a3

Please sign in to comment.