Skip to content

Commit

Permalink
Merge pull request #117 from doronz88/feature/activation
Browse files Browse the repository at this point in the history
Add activating, deactivating and querying state
  • Loading branch information
doronz88 authored Aug 23, 2021
2 parents d7baa65 + 6b65e83 commit d1e4317
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Options:
--help Show this message and exit.
Commands:
activation activation options
afc FileSystem utils
apps application options
backup2 backup utils
Expand Down Expand Up @@ -137,6 +138,7 @@ for line in SyslogService(lockdown=lockdown).watch():

Support | Service | Description
--------|---------|----------------------
DONE | `com.apple.mobileactivationd` | Activation services
DONE | `com.apple.afc` | File access for `/var/mobile/Media`
DONE | `com.apple.crashreportcopymobile` | File access for `/var/mobile/Library/Logs/CrashReports`
DONE | `com.apple.pcapd` | Sniff device's network traffic
Expand Down
3 changes: 2 additions & 1 deletion pymobiledevice3/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import click
import coloredlogs

from pymobiledevice3.cli.activation import cli as activation_cli
from pymobiledevice3.cli.afc import cli as afc_cli
from pymobiledevice3.cli.apps import cli as apps_cli
from pymobiledevice3.cli.backup import cli as backup_cli
Expand Down Expand Up @@ -36,7 +37,7 @@ def cli():
cli_commands = click.CommandCollection(sources=[
developer_cli, mounter_cli, apps_cli, profile_cli, lockdown_cli, diagnostics_cli, syslog_cli, pcap_cli,
crash_cli, afc_cli, ps_cli, notification_cli, list_devices_cli, power_assertion_cli, springboard_cli,
provision_cli, backup_cli, restore_cli
provision_cli, backup_cli, restore_cli, activation_cli
])
cli_commands()

Expand Down
35 changes: 35 additions & 0 deletions pymobiledevice3/cli/activation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import click

from pymobiledevice3.cli.cli_common import Command
from pymobiledevice3.services.mobile_activation import MobileActivationService


@click.group()
def cli():
""" cli """
pass


@cli.group()
def activation():
""" activation options """
pass


@activation.command(cls=Command)
def state(lockdown):
""" Get current activation state """
print(MobileActivationService(lockdown).state)


@activation.command(cls=Command)
@click.option('--offline', is_flag=True, help='Allow to send and receive requests manually')
def activate(lockdown, offline):
""" Activate device """
MobileActivationService(lockdown, offline).activate()


@activation.command(cls=Command)
def deactivate(lockdown):
""" Deactivate device """
MobileActivationService(lockdown).deactivate()
126 changes: 126 additions & 0 deletions pymobiledevice3/services/mobile_activation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import logging
import plistlib
import shlex
from contextlib import closing
from pathlib import Path
import time

import requests

from pymobiledevice3.lockdown import LockdownClient

ACTIVATION_USER_AGENT_IOS = 'iOS Device Activator (MobileActivation-20 built on Jan 15 2012 at 19:07:28)'
ACTIVATION_DEFAULT_URL = 'https://albert.apple.com/deviceservices/deviceActivation'
ACTIVATION_DRM_HANDSHAKE_DEFAULT_URL = 'https://albert.apple.com/deviceservices/drmHandshake'
DEFAULT_HEADERS = {
'Accept': 'application/xml',
'User-Agent': ACTIVATION_USER_AGENT_IOS,
'Expect': '100-continue',
}

ACTIVATION_REQUESTS_SUBDIR = Path('activation')


class MobileActivationService:
SERVICE_NAME = 'com.apple.mobileactivationd'

def __init__(self, lockdown: LockdownClient, offline=True):
self.logger = logging.getLogger(__name__)
self.lockdown = lockdown
self.offline = offline

@property
def state(self):
return self.send_command('GetActivationStateRequest')['Value']

def activate(self):
blob = self.create_activation_session_info()
# create drmHandshake request with blob from device
headers = {'Content-Type': 'application/x-apple-plist'}
headers.update(DEFAULT_HEADERS)
content, headers = self.post(ACTIVATION_DRM_HANDSHAKE_DEFAULT_URL, data=plistlib.dumps(blob), headers=headers)

activation_info = self.create_activation_info_with_session(content)

content, headers = self.post(ACTIVATION_DEFAULT_URL, data={'activation-info': plistlib.dumps(activation_info)})
assert headers['Content-Type'] == 'text/xml'
self.activate_with_session(content, headers)

def deactivate(self):
return self.send_command('DeactivateRequest')

def create_activation_session_info(self):
return self.send_command('CreateTunnel1SessionInfoRequest')['Value']

def create_activation_info_with_session(self, handshake_response):
return self.send_command('CreateTunnel1ActivationInfoRequest', handshake_response)['Value']

def activate_with_session(self, activation_record, headers):
data = {
'Command': 'HandleActivationInfoWithSessionRequest',
'Value': activation_record,
}
if headers:
data['ActivationResponseHeaders'] = dict(headers)
with closing(self.lockdown.start_service(self.SERVICE_NAME)) as service:
return service.send_recv_plist(data)

def send_command(self, command, value=''):
data = {'Command': command}
if value:
data['Value'] = value
with closing(self.lockdown.start_service(self.SERVICE_NAME)) as service:
return service.send_recv_plist(data)

def post(self, url, data, headers=None):
if headers is None:
headers = DEFAULT_HEADERS
if not self.offline:
resp = requests.post(url, data=data, headers=headers)
return resp.content, resp.headers
for file in ACTIVATION_REQUESTS_SUBDIR.iterdir():
file.unlink()
ACTIVATION_REQUESTS_SUBDIR.mkdir(parents=True, exist_ok=True)
curl_arguments = ['curl', '-X', 'POST', '-D', 'headers.txt']

if isinstance(data, bytes):
request_data = ACTIVATION_REQUESTS_SUBDIR / f'request_data_{hash(data)}.plist'
request_data.write_bytes(data)
curl_arguments.extend(['--data-binary', f'@{request_data.name}'])
elif isinstance(data, dict):
for key, value, in data.items():
request_data = ACTIVATION_REQUESTS_SUBDIR / f'request_data_{hash(value)}.plist'
request_data.write_bytes(value)
curl_arguments.extend(['-F', f'{key}=@{request_data.name}'])

for header, value in headers.items():
curl_arguments.extend(['-H', f'{header}: {value}'])

headers = ACTIVATION_REQUESTS_SUBDIR / 'headers.txt'
curl_arguments.extend(['-D', headers.name])
curl_arguments.append(url)

response = ACTIVATION_REQUESTS_SUBDIR / f'request_data_{time.time()}.txt'
request = ACTIVATION_REQUESTS_SUBDIR / f'request_script.sh'
request.write_text(f'#!/bin/sh\n{shlex.join(curl_arguments)} | tee {response.name}\n')
request.chmod(0o755)

self.logger.info(f'Run the following shell script ({request.name})')
# Check for plist response.
while not response.exists() or b'</plist>' not in response.read_bytes():
time.sleep(1)

# Check for headers.
while not headers.exists() or ': ' not in headers.read_text():
time.sleep(1)

raw_headers = headers.read_text()
parsed_headers = {}
for line in raw_headers.splitlines():
if ': ' not in line:
continue
key, value = line.split(': ', 1)
parsed_headers[key] = value

return response.read_bytes(), parsed_headers

0 comments on commit d1e4317

Please sign in to comment.