use tqdm show progress、attach if the application has been launched and hidden error stack when displayname or bundleid not existed.
328 lines
9.3 KiB
Python
Executable File
328 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Author : AloneMonkey
|
|
# blog: www.alonemonkey.com
|
|
|
|
import sys
|
|
import codecs
|
|
import frida
|
|
import threading
|
|
import os
|
|
import shutil
|
|
import time
|
|
import argparse
|
|
import tempfile
|
|
import subprocess
|
|
import re
|
|
|
|
import paramiko
|
|
from paramiko import SSHClient
|
|
from scp import SCPClient
|
|
from tqdm import tqdm
|
|
import traceback
|
|
|
|
reload(sys)
|
|
sys.setdefaultencoding('utf8')
|
|
|
|
script_dir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
DUMP_JS = os.path.join(script_dir, 'dump.js')
|
|
|
|
User = 'root'
|
|
Password = 'alpine'
|
|
Host = 'localhost'
|
|
Port = 2222
|
|
|
|
TEMP_DIR = tempfile.gettempdir()
|
|
PAYLOAD_DIR = 'Payload'
|
|
PAYLOAD_PATH = os.path.join(TEMP_DIR, PAYLOAD_DIR)
|
|
file_dict = {}
|
|
|
|
finished = threading.Event()
|
|
|
|
|
|
def get_usb_iphone():
|
|
device_manager = frida.get_device_manager()
|
|
changed = threading.Event()
|
|
|
|
def on_changed():
|
|
changed.set()
|
|
|
|
device_manager.on('changed', on_changed)
|
|
|
|
device = None
|
|
while device is None:
|
|
devices = [dev for dev in device_manager.enumerate_devices() if dev.type == 'tether']
|
|
if len(devices) == 0:
|
|
print 'Waiting for USB device...'
|
|
changed.wait()
|
|
else:
|
|
device = devices[0]
|
|
|
|
device_manager.off('changed', on_changed)
|
|
|
|
return device
|
|
|
|
|
|
def generate_ipa(path, display_name):
|
|
ipa_filename = display_name + '.ipa'
|
|
|
|
print 'Generating "{}"'.format(ipa_filename)
|
|
try:
|
|
app_name = file_dict['app']
|
|
|
|
for key, value in file_dict.items():
|
|
from_dir = os.path.join(path, key)
|
|
to_dir = os.path.join(path, app_name, value)
|
|
if key != 'app':
|
|
shutil.move(from_dir, to_dir)
|
|
|
|
target_dir = './' + PAYLOAD_DIR
|
|
zip_args = ('zip', '-qr', os.path.join(os.getcwd(), ipa_filename), target_dir)
|
|
subprocess.check_call(zip_args, cwd=TEMP_DIR)
|
|
shutil.rmtree(PAYLOAD_PATH)
|
|
print
|
|
except Exception as e:
|
|
print e
|
|
finished.set()
|
|
|
|
def on_message(message, data):
|
|
t = tqdm(unit='B',unit_scale=True,unit_divisor=1024,miniters=1)
|
|
last_sent = [0]
|
|
|
|
def progress(filename, size, sent):
|
|
t.desc = os.path.basename(filename)
|
|
t.total = size
|
|
t.update(sent - last_sent[0])
|
|
last_sent[0] = 0 if size == sent else sent
|
|
|
|
if 'payload' in message:
|
|
payload = message['payload']
|
|
if 'dump' in payload:
|
|
origin_path = payload['path']
|
|
dump_path = payload['dump']
|
|
|
|
scp_from = dump_path
|
|
scp_to = PAYLOAD_PATH + u'/'
|
|
|
|
with SCPClient(ssh.get_transport(), progress = progress) as scp:
|
|
scp.get(scp_from, scp_to)
|
|
t.close()
|
|
chmod_dir = os.path.join(PAYLOAD_PATH, os.path.basename(dump_path))
|
|
chmod_args = ('chmod', '655', chmod_dir)
|
|
try:
|
|
subprocess.check_call(chmod_args)
|
|
except subprocess.CalledProcessError as err:
|
|
print err
|
|
|
|
index = origin_path.find('.app/')
|
|
file_dict[os.path.basename(dump_path)] = origin_path[index + 5:]
|
|
|
|
if 'app' in payload:
|
|
app_path = payload['app']
|
|
|
|
scp_from = app_path
|
|
scp_to = PAYLOAD_PATH + u'/'
|
|
with SCPClient(ssh.get_transport(), progress = progress) as scp:
|
|
scp.get(scp_from, scp_to, recursive=True)
|
|
t.close()
|
|
chmod_dir = os.path.join(PAYLOAD_PATH, os.path.basename(app_path))
|
|
chmod_args = ('chmod', '755', chmod_dir)
|
|
try:
|
|
subprocess.check_call(chmod_args)
|
|
except subprocess.CalledProcessError as err:
|
|
print err
|
|
|
|
file_dict['app'] = os.path.basename(app_path)
|
|
|
|
if 'done' in payload:
|
|
finished.set()
|
|
|
|
|
|
def compare_applications(a, b):
|
|
a_is_running = a.pid != 0
|
|
b_is_running = b.pid != 0
|
|
if a_is_running == b_is_running:
|
|
if a.name > b.name:
|
|
return 1
|
|
elif a.name < b.name:
|
|
return -1
|
|
else:
|
|
return 0
|
|
elif a_is_running:
|
|
return -1
|
|
else:
|
|
return 1
|
|
|
|
|
|
def cmp_to_key(mycmp):
|
|
"""Convert a cmp= function into a key= function"""
|
|
|
|
class K:
|
|
def __init__(self, obj):
|
|
self.obj = obj
|
|
|
|
def __lt__(self, other):
|
|
return mycmp(self.obj, other.obj) < 0
|
|
|
|
def __gt__(self, other):
|
|
return mycmp(self.obj, other.obj) > 0
|
|
|
|
def __eq__(self, other):
|
|
return mycmp(self.obj, other.obj) == 0
|
|
|
|
def __le__(self, other):
|
|
return mycmp(self.obj, other.obj) <= 0
|
|
|
|
def __ge__(self, other):
|
|
return mycmp(self.obj, other.obj) >= 0
|
|
|
|
def __ne__(self, other):
|
|
return mycmp(self.obj, other.obj) != 0
|
|
|
|
return K
|
|
|
|
|
|
def get_applications(device):
|
|
try:
|
|
applications = device.enumerate_applications()
|
|
except Exception as e:
|
|
print 'Failed to enumerate applications: %s' % e
|
|
return
|
|
|
|
return applications
|
|
|
|
|
|
def list_applications(device):
|
|
applications = get_applications(device)
|
|
|
|
if len(applications) > 0:
|
|
pid_column_width = max(map(lambda app: len('{}'.format(app.pid)), applications))
|
|
name_column_width = max(map(lambda app: len(app.name), applications))
|
|
identifier_column_width = max(map(lambda app: len(app.identifier), applications))
|
|
else:
|
|
pid_column_width = 0
|
|
name_column_width = 0
|
|
identifier_column_width = 0
|
|
|
|
header_format = '%' + str(pid_column_width) + 's ' + '%-' + str(name_column_width) + 's ' + '%-' + str(
|
|
identifier_column_width) + 's'
|
|
print header_format % ('PID', 'Name', 'Identifier')
|
|
print '%s %s %s' % (pid_column_width * '-', name_column_width * '-', identifier_column_width * '-')
|
|
line_format = '%' + str(pid_column_width) + 's ' + '%-' + str(name_column_width) + 's ' + '%-' + str(
|
|
identifier_column_width) + 's'
|
|
for application in sorted(applications, key=cmp_to_key(compare_applications)):
|
|
if application.pid == 0:
|
|
print line_format % ('-', application.name, application.identifier)
|
|
else:
|
|
print line_format % (application.pid, application.name, application.identifier)
|
|
|
|
|
|
def load_js_file(session, filename):
|
|
source = ''
|
|
with codecs.open(filename, 'r', 'utf-8') as f:
|
|
source = source + f.read()
|
|
script = session.create_script(source)
|
|
script.on('message', on_message)
|
|
script.load()
|
|
|
|
return script
|
|
|
|
|
|
def create_dir(path):
|
|
path = path.strip()
|
|
path = path.rstrip('\\')
|
|
if os.path.exists(path):
|
|
shutil.rmtree(path)
|
|
try:
|
|
os.makedirs(path)
|
|
except os.error as err:
|
|
print err
|
|
|
|
|
|
def open_target_app(device, name_or_bundleid):
|
|
print 'Start the target app {}'.format(name_or_bundleid)
|
|
|
|
pid = ''
|
|
session = None
|
|
display_name = ''
|
|
bundle_identifier = ''
|
|
for application in get_applications(device):
|
|
if name_or_bundleid == application.identifier or name_or_bundleid == application.name:
|
|
pid = application.pid
|
|
display_name = application.name
|
|
bundle_identifier = application.identifier
|
|
|
|
try:
|
|
if not pid:
|
|
pid = device.spawn([bundle_identifier])
|
|
session = device.attach(pid)
|
|
device.resume(pid)
|
|
else:
|
|
session = device.attach(pid)
|
|
except Exception as e:
|
|
print e
|
|
|
|
return session, display_name, bundle_identifier
|
|
|
|
|
|
def start_dump(session, ipa_name):
|
|
print 'Dumping {} to {}'.format(display_name, TEMP_DIR)
|
|
|
|
script = load_js_file(session, DUMP_JS)
|
|
script.post('dump')
|
|
finished.wait()
|
|
|
|
generate_ipa(PAYLOAD_PATH, ipa_name)
|
|
|
|
if session:
|
|
session.detach()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='frida-ios-dump (by AloneMonkey v2.0)')
|
|
parser.add_argument('-l', '--list', dest='list_applications', action='store_true', help='List the installed apps')
|
|
parser.add_argument('-o', '--output', dest='output_ipa', help='Specify name of the decrypted IPA')
|
|
parser.add_argument('target', nargs='?', help='Bundle identifier or display name of the target app')
|
|
args = parser.parse_args()
|
|
|
|
exit_code = 0
|
|
ssh = None
|
|
device = get_usb_iphone()
|
|
if args.list_applications:
|
|
list_applications(device)
|
|
else:
|
|
name_or_bundleid = args.target
|
|
output_ipa = args.output_ipa
|
|
|
|
try:
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect(Host, port=Port, username=User, password=Password)
|
|
|
|
create_dir(PAYLOAD_PATH)
|
|
(session, display_name, bundle_identifier) = open_target_app(device, name_or_bundleid)
|
|
if output_ipa is None:
|
|
output_ipa = display_name
|
|
output_ipa = re.sub('\.ipa$', '', output_ipa)
|
|
if session:
|
|
start_dump(session, output_ipa)
|
|
except paramiko.ssh_exception.NoValidConnectionsError as e:
|
|
print e
|
|
exit_code = 1
|
|
except paramiko.AuthenticationException as e:
|
|
print e
|
|
exit_code = 1
|
|
except Exception as e:
|
|
print('*** Caught exception: %s: %s' % (e.__class__, e))
|
|
traceback.print_exc()
|
|
exit_code = 1
|
|
|
|
if ssh:
|
|
ssh.close()
|
|
|
|
if os.path.exists(PAYLOAD_PATH):
|
|
shutil.rmtree(PAYLOAD_PATH)
|
|
|
|
sys.exit(exit_code) |