diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index ebd54c452..9d2b9ee51 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -35,6 +35,7 @@ def my_sandbox(path, rootfs): ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap # r2.enable_trace() + r2.set_backtrace(0x401906) ql.run() if __name__ == "__main__": diff --git a/examples/extensions/r2/mem_r2.py b/examples/extensions/r2/mem_r2.py new file mode 100644 index 000000000..9a18bb4f0 --- /dev/null +++ b/examples/extensions/r2/mem_r2.py @@ -0,0 +1,88 @@ +import sys +from types import FrameType +sys.path.append('..') + +from tests.test_elf import ELFTest +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions.r2 import R2 + +def test_elf_linux_arm(): + def my_puts(ql: Qiling): + params = ql.os.resolve_fcall_params(ELFTest.PARAMS_PUTS) + print(f'puts("{params["s"]}")') + # all_mem = ql.mem.save() + # for lbound, ubound, perm, _, _, _data in ql.mem.map_info: + # print(f"{lbound:#x} - {ubound:#x} {ubound - lbound:#x} {len(_data):#x} {perm:#x}") + # print() + # ql.mem.restore(all_mem) + + ql = Qiling(["../examples/rootfs/arm_linux/bin/arm_stat64"], "../examples/rootfs/arm_linux", verbose=QL_VERBOSE.DEBUG) + ql.os.set_api('puts', my_puts) + ql.run() + del ql + +def fn(frame: FrameType, msg, arg): + if msg == 'return': + print("Return: ", arg) + return + if msg != 'call': return + # Filter as appropriate + if 'memory' not in frame.f_code.co_filename: return + if '<' in frame.f_code.co_name: return + caller = frame.f_back.f_code.co_name + print("Called ", frame.f_code.co_name, "from ", caller) + for i in range(frame.f_code.co_argcount): + name = frame.f_code.co_varnames[i] + var = frame.f_locals[name] + if isinstance(var, (bytes, bytearray)): + var = f'{type(var)} len {len(var)}' + print(" Argument", name, "is", var) + +sys.settrace(fn) + +def unmap_hook(ql: "Qiling", access: int, addr: int, size: int, value: int): + print(f"Unmapped memory access at {addr:#x} - {addr + size:#x} with {value:#x} in type {access}") + +def mem_cmp_hook(ql: "Qiling", addr: int, size: int): + mapinfo = ql.mem.map_info + for i, mem_region in enumerate(ql.uc.mem_regions()): + assert (mapinfo[i][0], mapinfo[i][1] - 1, mapinfo[i][2]) == mem_region + uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) + data = ql.mem.map_info[i][5] + if uc_mem == data: continue + print(f"Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info from {addr:#x}") + for line in ql.mem.get_formatted_mapinfo(): + print(line) + with open("mem.bin", "wb") as f: + f.write(uc_mem) + with open("map.bin", "wb") as f: + f.write(data) + assert False + +def addr_hook(ql: "Qiling"): + mapinfo = ql.mem.map_info + for i, mem_region in enumerate(ql.uc.mem_regions()): + if i != 8: continue + uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) + with open('right.bin', 'wb') as f: + f.write(uc_mem) + +if __name__ == '__main__': + # from tests.test_shellcode import X8664_LIN + env = {'LD_DEBUG': 'all'} + # ql = Qiling(rootfs="rootfs/x8664_linux", code=X8664_LIN, archtype="x8664", ostype="linux", verbose=QL_VERBOSE.DEBUG) + # ql = Qiling(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows") + # ql = Qiling(["rootfs/arm_linux/bin/arm_hello_static"], "rootfs/arm_linux", verbose=QL_VERBOSE.DISASM) + # ql = Qiling(["rootfs/arm_linux/bin/arm_hello"], "rootfs/arm_linux", env=env, verbose=QL_VERBOSE.DEBUG) + ql = Qiling(["rootfs/x86_linux/bin/x86_hello"], "rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG) + # ql.hook_mem_unmapped(unmap_hook) + # ql.hook_code(mem_cmp_hook) + # mprot_addr = 0x047d4824 + # ql.hook_address(addr_hook, mprot_addr) + # ql.debugger = 'qdb' + # ql = Qiling(["rootfs/x8664_linux/bin/testcwd"], "rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG) + for line in ql.mem.get_formatted_mapinfo(): + print(line) + ql.run() + # test_elf_linux_arm() diff --git a/qiling/arch/utils.py b/qiling/arch/utils.py index 7c6bcd9c8..b4c0ed13c 100644 --- a/qiling/arch/utils.py +++ b/qiling/arch/utils.py @@ -27,7 +27,7 @@ def __init__(self, ql: Qiling): @lru_cache(maxsize=64) def get_base_and_name(self, addr: int) -> Tuple[int, str]: - for begin, end, _, name, _ in self.ql.mem.map_info: + for begin, end, _, name, _, _ in self.ql.mem.map_info: if begin <= addr < end: return begin, basename(name) diff --git a/qiling/extensions/r2/callstack.py b/qiling/extensions/r2/callstack.py new file mode 100644 index 000000000..689fd0e16 --- /dev/null +++ b/qiling/extensions/r2/callstack.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass +from typing import Iterator, Optional + + +@dataclass +class CallStack: + """Linked Frames + See https://github.com/angr/angr/blob/master/angr/state_plugins/callstack.py + """ + addr: int + sp: int + bp: int + name: str = None # 'name + offset' + next: Optional['CallStack'] = None + + def __iter__(self) -> Iterator['CallStack']: + """ + Iterate through the callstack, from top to bottom + (most recent first). + """ + i = self + while i is not None: + yield i + i = i.next + + def __getitem__(self, k): + """ + Returns the CallStack at index k, indexing from the top of the stack. + """ + orig_k = k + for i in self: + if k == 0: + return i + k -= 1 + raise IndexError(orig_k) + + def __len__(self): + """ + Get how many frames there are in the current call stack. + + :return: Number of frames + :rtype: int + """ + + o = 0 + for _ in self: + o += 1 + return o + + def __repr__(self): + """ + Get a string representation. + + :return: A printable representation of the CallStack object + :rtype: str + """ + return "" % len(self) + + def __str__(self): + return "Backtrace:\n" + "\n".join(f"Frame {i}: [{f.name}] {f.addr:#x} sp={f.sp:#x}, bp={f.bp:#x}" for i, f in enumerate(self)) + + def __eq__(self, other): + if not isinstance(other, CallStack): + return False + + if self.addr != other.addr or self.sp != other.sp or self.bp != other.bp: + return False + + return self.next == other.next + + def __ne__(self, other): + return not (self == other) + + def __hash__(self): + return hash(tuple((c.addr, c.sp, c.bp) for c in self)) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 13a655b2f..b86bb0ddd 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -13,6 +13,7 @@ from qiling.const import QL_ARCH from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL +from .callstack import CallStack if TYPE_CHECKING: from qiling.core import Qiling @@ -141,10 +142,8 @@ def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): self.loadaddr = loadaddr # r2 -m [addr] map file at given address self.analyzed = False self._r2c = libr.r_core.r_core_new() - if ql.code: - self._setup_code(ql.code) - else: - self._setup_file(ql.path) + self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t)) + self._setup_mem(ql) def _qlarch2r(self, archtype: QL_ARCH) -> str: return { @@ -161,20 +160,23 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str: QL_ARCH.PPC: "ppc", }[archtype] - def _setup_code(self, code: bytes): - path = f'malloc://{len(code)}'.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - self._cmd(f'wx {code.hex()}') + def _rbuf_map(self, cbuf: ctypes.Array[ctypes.c_ubyte], perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0): + rbuf = libr.r_buf_new_with_pointers(cbuf, len(cbuf), False) # last arg `steal` = False + rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t)) + desc = libr.r_io_open_buffer(self._r2i, rbuf, perm, 0) # last arg `mode` is always 0 in r2 code + libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(cbuf)) + + def _setup_mem(self, ql: 'Qiling'): + if not hasattr(ql, '_mem'): + return + for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info: + cbuf = ql.mem.cmap[start] + self._rbuf_map(cbuf, perms, start) # set architecture and bits for r2 asm - arch = self._qlarch2r(self.ql.arch.type) - self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") - - def _setup_file(self, path: str): - path = path.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - + arch = self._qlarch2r(ql.arch.type) + self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}") + self._cmd("oba") # load bininfo and update flags + def _cmd(self, cmd: str) -> str: r = libr.r_core.r_core_cmd_str( self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) @@ -268,6 +270,40 @@ def dis_nbytes(self, addr: int, size: int) -> List[Instruction]: insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")] return insts + def dis_ninsts(self, addr: int, n: int=1) -> List[Instruction]: + insts = [Instruction(**dic) for dic in self._cmdj(f"pdj {n} @ {addr}")] + return insts + + def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallStack]: + '''Fuzzy backtrace, see https://github.com/radareorg/radare2/blob/master/libr/debug/p/native/bt/fuzzy_all.c#L38 + Args: + at: address to start walking stack, default to current SP + depth: limit of stack walking + Returns: + List of Frame + ''' + sp = at or self.ql.arch.regs.arch_sp + wordsize = self.ql.arch.bits // 8 + frame = None + cursp = oldsp = sp + for i in range(depth): + addr = self.ql.stack_read(i * wordsize) + inst = self.dis_ninsts(addr)[0] + if inst.type.lower() == 'call': + newframe = CallStack(addr=addr, sp=cursp, bp=oldsp, name=self.at(addr), next=frame) + frame = newframe + oldsp = cursp + cursp += wordsize + return frame + + def set_backtrace(self, target: Union[int, str]): + '''Set backtrace at target address before executing''' + if isinstance(target, str): + target = self.where(target) + def bt_hook(__ql: "Qiling", *args): + print(self._backtrace_fuzzy()) + self.ql.hook_address(bt_hook, target) + def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: '''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code :param ql: Qiling instance @@ -279,7 +315,7 @@ def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=No anibbles = ql.arch.bits // 4 progress = 0 for inst in self.dis_nbytes(addr, size): - if inst.type.lower() == 'invalid': + if inst.type.lower() in ('invalid', 'ill'): break # stop disasm name, offset = self.at(inst.offset, parse=True) if filt is None or filt.search(name): @@ -301,5 +337,14 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) + def shell(self): + while True: + offset = self._r2c.contents.offset + print(f"[{offset:#x}]> ", end="") + cmd = input() + if cmd.strip() == "q": + break + print(self._cmd(cmd)) + def __del__(self): libr.r_core.r_core_free(self._r2c) diff --git a/qiling/os/memory.py b/qiling/os/memory.py index 21d461e65..a9c5c6a6d 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -3,6 +3,7 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +import ctypes import os, re from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union @@ -11,8 +12,8 @@ from qiling import Qiling from qiling.exception import * -# tuple: range start, range end, permissions mask, range label, is mmio? -MapInfoEntry = Tuple[int, int, int, str, bool] +# tuple: range start, range end, permissions mask, range label, is mmio?, bytearray +MapInfoEntry = Tuple[int, int, int, str, bool, bytearray] MmioReadCallback = Callable[[Qiling, int, int], int] MmioWriteCallback = Callable[[Qiling, int, int, int], None] @@ -48,6 +49,8 @@ def __init__(self, ql: Qiling): # make sure pagesize is a power of 2 assert self.pagesize & (self.pagesize - 1) == 0, 'pagesize has to be a power of 2' + self.cmap = {} # mapping from start addr to cdata ptr + def __read_string(self, addr: int) -> str: ret = bytearray() c = self.read(addr, 1) @@ -80,7 +83,7 @@ def string(self, addr: int, value=None, encoding='utf-8') -> Optional[str]: self.__write_string(addr, value, encoding) - def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False): + def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None): """Add a new memory range to map. Args: @@ -90,12 +93,11 @@ def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio mem_info: map entry label is_mmio: memory range is mmio """ - - self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio)) - self.map_info = sorted(self.map_info, key=lambda tp: tp[0]) + self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data)) + self.map_info.sort(key=lambda tp: tp[0]) def del_mapinfo(self, mem_s: int, mem_e: int): - """Subtract a memory range from map. + """Subtract a memory range from map, will destroy data and unmap uc mem in the range. Args: mem_s: memory range start @@ -104,30 +106,37 @@ def del_mapinfo(self, mem_s: int, mem_e: int): tmp_map_info: MutableSequence[MapInfoEntry] = [] - for s, e, p, info, mmio in self.map_info: + for s, e, p, info, mmio, data in self.map_info: if e <= mem_s: - tmp_map_info.append((s, e, p, info, mmio)) + tmp_map_info.append((s, e, p, info, mmio, data)) continue if s >= mem_e: - tmp_map_info.append((s, e, p, info, mmio)) + tmp_map_info.append((s, e, p, info, mmio, data)) continue + del self.cmap[s] # remove cdata reference starting at s if s < mem_s: - tmp_map_info.append((s, mem_s, p, info, mmio)) + self.ql.uc.mem_unmap(s, mem_s - s) + self.map_ptr(s, mem_s - s, p, data[:mem_s - s]) + tmp_map_info.append((s, mem_s, p, info, mmio, data[:mem_s - s])) if s == mem_s: pass if e > mem_e: - tmp_map_info.append((mem_e, e, p, info, mmio)) + self.ql.uc.mem_unmap(mem_e, e - mem_e) + self.map_ptr(mem_e, e - mem_e, p, data[mem_e - e:]) + tmp_map_info.append((mem_e, e, p, info, mmio, data[mem_e - e:])) if e == mem_e: pass + del data[mem_s - s:mem_e - s] + self.map_info = tmp_map_info - def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None): + def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None): tmp_map_info: Optional[MapInfoEntry] = None info_idx: int = None @@ -142,12 +151,15 @@ def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, me return if mem_p is not None: - self.del_mapinfo(mem_s, mem_e) - self.add_mapinfo(mem_s, mem_e, mem_p, mem_info if mem_info else tmp_map_info[3]) + data = data or self.read(mem_s, mem_e - mem_s).copy() + assert(len(data) == mem_e - mem_s) + self.unmap(mem_s, mem_e - mem_s) + self.map_ptr(mem_s, mem_e - mem_s, mem_p, data) + self.add_mapinfo(mem_s, mem_e, mem_p, mem_info or tmp_map_info[3], tmp_map_info[4], data) return if mem_info is not None: - self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4]) + self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5]) def get_mapinfo(self) -> Sequence[Tuple[int, int, str, str, str]]: """Get memory map info. @@ -166,7 +178,7 @@ def __perms_mapping(ps: int) -> str: return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) - def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool) -> Tuple[int, int, str, str, str]: + def __process(lbound: int, ubound: int, perms: int, label: str, is_mmio: bool, _data: bytearray) -> Tuple[int, int, str, str, str]: perms_str = __perms_mapping(perms) if hasattr(self.ql, 'loader'): @@ -211,7 +223,7 @@ def get_lib_base(self, filename: str) -> Optional[int]: # some info labels may be prefixed by boxed label which breaks the search by basename. # iterate through all info labels and remove all boxed prefixes, if any - stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _ in self.map_info) + stripped = ((lbound, p.sub('', info)) for lbound, _, _, info, _, _ in self.map_info) return next((lbound for lbound, info in stripped if os.path.basename(info) == filename), None) @@ -268,12 +280,12 @@ def save(self): "mmio" : [] } - for lbound, ubound, perm, label, is_mmio in self.map_info: + for lbound, ubound, perm, label, is_mmio, data in self.map_info: if is_mmio: mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) else: - data = self.read(lbound, ubound - lbound) - mem_dict['ram'].append((lbound, ubound, perm, label, bytes(data))) + data = self.read(lbound, ubound - lbound) # read instead of using data from map_info to avoid error + mem_dict['ram'].append((lbound, ubound, perm, label, data)) return mem_dict @@ -287,10 +299,10 @@ def restore(self, mem_dict): size = ubound - lbound if self.is_available(lbound, size): self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}') - self.map(lbound, size, perms, label) + self.map(lbound, size, perms, label, data) self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}') - self.write(lbound, data) + self.write(lbound, bytes(data)) for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']: self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}") @@ -393,7 +405,7 @@ def search(self, needle: Union[bytes, Pattern[bytes]], begin: Optional[int] = No assert begin < end, 'search arguments do not make sense' # narrow the search down to relevant ranges; mmio ranges are excluded due to potential read size effects - ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio in self.map_info if not (end < lbound or ubound < begin or is_mmio)] + ranges = [(max(begin, lbound), min(ubound, end)) for lbound, ubound, _, _, is_mmio, _data in self.map_info if not (end < lbound or ubound < begin or is_mmio)] results = [] # if needle is a bytes sequence use it verbatim, not as a pattern @@ -439,10 +451,10 @@ def __mapped_regions(self) -> Iterator[Tuple[int, int]]: iter_memmap = iter(self.map_info) - p_lbound, p_ubound, _, _, _ = next(iter_memmap) + p_lbound, p_ubound, _, _, _, _ = next(iter_memmap) # map_info is assumed to contain non-overlapping regions sorted by lbound - for lbound, ubound, _, _, _ in iter_memmap: + for lbound, ubound, _, _, _, _ in iter_memmap: if lbound == p_ubound: p_ubound = ubound else: @@ -514,8 +526,8 @@ def find_free_space(self, size: int, minaddr: Optional[int] = None, maxaddr: Opt assert minaddr < maxaddr # get gap ranges between mapped ones and memory bounds - gaps_ubounds = tuple(lbound for lbound, _, _, _, _ in self.map_info) + (mem_ubound,) - gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _ in self.map_info) + gaps_ubounds = tuple(lbound for lbound, _, _, _, _, _ in self.map_info) + (mem_ubound,) + gaps_lbounds = (mem_lbound,) + tuple(ubound for _, ubound, _, _, _, _ in self.map_info) gaps = zip(gaps_lbounds, gaps_ubounds) for lbound, ubound in gaps: @@ -563,7 +575,7 @@ def protect(self, addr: int, size: int, perms): self.change_mapinfo(aligned_address, aligned_address + aligned_size, mem_p = perms) - def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str] = None): + def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str] = None, ptr: Optional[bytearray] = None): """Map a new memory range. Args: @@ -580,10 +592,31 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str assert perms & ~UC_PROT_ALL == 0, f'unexpected permissions mask {perms}' if not self.is_available(addr, size): - raise QlMemoryMappedError('Requested memory is unavailable') + for line in self.get_formatted_mapinfo(): + print(line) + raise QlMemoryMappedError(f'Requested memory {addr:#x} + {size:#x} is unavailable') - self.ql.uc.mem_map(addr, size, perms) - self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False) + buf = self.map_ptr(addr, size, perms, ptr) + self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf) + + def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[bytearray] = None) -> bytearray: + """Map a new memory range allocated as Python bytearray, will not affect map_info + + Args: + addr: memory range base address + size: memory range size (in bytes) + perms: requested permissions mask + buf: bytearray already allocated (if any) + + Returns: + bytearray with size, should be added to map_info by caller + """ + buf = buf or bytearray(size) + buf_type = ctypes.c_ubyte * size + cdata = buf_type.from_buffer(buf) + self.cmap[addr] = cdata + self.ql.uc.mem_map_ptr(addr, size, perms, cdata) + return buf def map_mmio(self, addr: int, size: int, read_cb: Optional[MmioReadCallback], write_cb: Optional[MmioWriteCallback], info: str = '[mmio]'): # TODO: mmio memory overlap with ram? Is that possible? diff --git a/qiling/os/posix/syscall/mman.py b/qiling/os/posix/syscall/mman.py index 5e3eae0bf..d518bdf4a 100755 --- a/qiling/os/posix/syscall/mman.py +++ b/qiling/os/posix/syscall/mman.py @@ -9,6 +9,7 @@ from qiling.exception import QlMemoryMappedError from qiling.os.filestruct import ql_file from qiling.os.posix.const_mapping import * +from qiling.utils import assert_mem_equal def ql_syscall_munmap(ql: Qiling, addr: int, length: int): @@ -16,7 +17,7 @@ def ql_syscall_munmap(ql: Qiling, addr: int, length: int): mapped_fd = [fd for fd in ql.os.fd if fd != 0 and isinstance(fd, ql_file) and fd._is_map_shared and not (fd.name.endswith(".so") or fd.name.endswith(".dylib"))] if mapped_fd: - all_mem_info = [_mem_info for _, _, _, _mem_info in ql.mem.map_info if _mem_info not in ("[mapped]", "[stack]", "[hook_mem]")] + all_mem_info = [_mem_info for _, _, _, _mem_info, _mmio, _data in ql.mem.map_info if _mem_info not in ("[mapped]", "[stack]", "[hook_mem]")] for _fd in mapped_fd: if _fd.name in [each.split()[-1] for each in all_mem_info]: @@ -42,6 +43,7 @@ def ql_syscall_madvise(ql: Qiling, addr: int, length: int, advice: int): def ql_syscall_mprotect(ql: Qiling, start: int, mlen: int, prot: int): + assert_mem_equal(ql) try: ql.mem.protect(start, mlen, prot) except Exception as e: @@ -53,6 +55,7 @@ def ql_syscall_mprotect(ql: Qiling, start: int, mlen: int, prot: int): def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, fd: int, pgoffset: int, ver: int): + assert_mem_equal(ql) MAP_FAILED = -1 MAP_SHARED = 0x01 MAP_FIXED = 0x10 @@ -92,6 +95,7 @@ def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, f if (flags & MAP_FIXED) and mmap_base != addr: return MAP_FAILED + assert_mem_equal(ql) # initial ql.loader.mmap_address if mmap_base != 0 and ql.mem.is_mapped(mmap_base, mmap_size): if (flags & MAP_FIXED) > 0: @@ -105,18 +109,20 @@ def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, f else: mmap_base = 0 + assert_mem_equal(ql) # initialized mapping if need_mmap: if mmap_base == 0: mmap_base = ql.loader.mmap_address ql.loader.mmap_address = mmap_base + mmap_size - ql.log.debug("%s - mapping needed for 0x%x" % (api_name, mmap_base)) + ql.log.debug("%s - mapping needed at 0x%x with size 0x%x" % (api_name, mmap_base, mmap_size)) try: ql.mem.map(mmap_base, mmap_size, prot, "[syscall_%s]" % api_name) except Exception as e: raise QlMemoryMappedError("Error: mapping needed but failed") ql.log.debug("%s - addr range 0x%x - 0x%x: " % (api_name, mmap_base, mmap_base + mmap_size - 1)) + assert_mem_equal(ql) # FIXME: MIPS32 Big Endian try: ql.mem.write(mmap_base, b'\x00' * mmap_size) @@ -142,6 +148,7 @@ def syscall_mmap_impl(ql: Qiling, addr: int, mlen: int, prot: int, flags: int, f ql.log.debug(e) raise QlMemoryMappedError("Error: trying to write memory: ") + assert_mem_equal(ql) return mmap_base diff --git a/qiling/os/posix/syscall/unistd.py b/qiling/os/posix/syscall/unistd.py index 9d715bfa0..dcb51ae2f 100644 --- a/qiling/os/posix/syscall/unistd.py +++ b/qiling/os/posix/syscall/unistd.py @@ -17,6 +17,7 @@ from qiling.os.posix.const import * from qiling.os.posix.stat import Stat from qiling.core_hooks import QlCoreHooks +from qiling.utils import assert_mem_equal def ql_syscall_exit(ql: Qiling, code: int): if ql.os.child_processes == True: @@ -205,6 +206,7 @@ def ql_syscall_brk(ql: Qiling, inp: int): # current brk_address will be modified if inp is not NULL(zero) # otherwise, just return current brk_address + assert_mem_equal(ql) if inp: cur_brk_addr = ql.loader.brk_address new_brk_addr = ql.mem.align_up(inp) @@ -217,6 +219,7 @@ def ql_syscall_brk(ql: Qiling, inp: int): ql.loader.brk_address = new_brk_addr + assert_mem_equal(ql) return ql.loader.brk_address diff --git a/qiling/utils.py b/qiling/utils.py index 3c959b892..4a4d1d9ab 100644 --- a/qiling/utils.py +++ b/qiling/utils.py @@ -16,7 +16,7 @@ from types import ModuleType from typing import TYPE_CHECKING, Any, Callable, Mapping, Optional, Tuple, TypeVar, Union -from unicorn import UC_ERR_READ_UNMAPPED, UC_ERR_FETCH_UNMAPPED +from unicorn import UC_ERR_READ_UNMAPPED, UC_ERR_FETCH_UNMAPPED, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC from qiling.exception import * from qiling.const import QL_ARCH, QL_ENDIAN, QL_OS, QL_DEBUGGER @@ -32,6 +32,34 @@ T = TypeVar('T') QlClassInit = Callable[['Qiling'], T] +def uc2perm(ps: int) -> str: + perms_d = { + UC_PROT_READ : 'r', + UC_PROT_WRITE : 'w', + UC_PROT_EXEC : 'x' + } + + return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) + +def assert_mem_equal(ql: 'Qiling'): + map_info = ql.mem.map_info + mem_regions = list(ql.uc.mem_regions()) + assert len(map_info) == len(mem_regions), f'len: map_info={len(map_info)} != mem_regions={len(mem_regions)}' + for i, mem_region in enumerate(mem_regions): + s, e, p, label, _, data = map_info[i] + if (s, e - 1, p) != mem_region: + ql.log.error('map_info:') + print('\n'.join(ql.mem.get_formatted_mapinfo())) + ql.log.error('uc.mem_regions:') + print('\n'.join(f'{s:010x} - {e:010x} {uc2perm(p)}' for (s, e, p) in mem_regions)) + raise AssertionError(f'(start, end, perm): map_info={(s, e - 1, p)} != mem_region={mem_region}') + uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) + assert len(data) == len(uc_mem), f'len of {i} mem: map_info={len(data)} != mem_region={len(uc_mem)}' + if data != uc_mem: + Path('/tmp/uc_mem.bin').write_bytes(uc_mem) + Path('/tmp/map_info.bin').write_bytes(data) + raise AssertionError(f'Memory region {i} {s:#x} - {e:#x} != map_info[{i}] {label}') + def catch_KeyboardInterrupt(ql: 'Qiling', func: Callable): def wrapper(*args, **kw): try: @@ -470,6 +498,8 @@ def verify_ret(ql: 'Qiling', err): raise __all__ = [ + 'uc2perm', + 'assert_mem_equal', 'catch_KeyboardInterrupt', 'os_convert', 'arch_convert', diff --git a/tests/test_mem.py b/tests/test_mem.py new file mode 100644 index 000000000..4ba8b3df7 --- /dev/null +++ b/tests/test_mem.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +import sys +import unittest +sys.path.append("..") + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.exception import QlMemoryMappedError +from qiling.os.posix.syscall.mman import ql_syscall_mmap2 +from qiling.os.posix.syscall.unistd import ql_syscall_brk +from unicorn.x86_const import UC_X86_REG_EAX, UC_X86_REG_ESI +from unicorn import UC_PROT_ALL, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_NONE, UcError +from test_shellcode import MIPS32EL_LIN, X8664_LIN, X86_LIN + + +class MemTest(unittest.TestCase): + def assert_mem_equal(self, ql: "Qiling"): + map_info = ql.mem.map_info + mem_regions = list(ql.uc.mem_regions()) + self.assertEqual(len(map_info), len(mem_regions)) + for i, mem_region in enumerate(mem_regions): + s, e, p, _, _, data = map_info[i] + self.assertEqual((s, e - 1, p), mem_region) + uc_mem = ql.mem.read( + mem_region[0], mem_region[1] - mem_region[0] + 1) + self.assertEqual(data, uc_mem) + + def test_map_correct(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + ql.mem.map(0x40000, 0x1000 * 16, UC_PROT_ALL) # [0x40000, 0x50000] + ql.mem.map(0x60000, 0x1000 * 16, UC_PROT_ALL) # [0x60000, 0x70000] + ql.mem.map(0x20000, 0x1000 * 16, UC_PROT_ALL) # [0x20000, 0x30000] + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x10000, 0x2000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x25000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x35000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x45000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x55000, 0x2000 * 16, UC_PROT_ALL) + ql.mem.map(0x50000, 0x5000, UC_PROT_ALL) + ql.mem.map(0x35000, 0x5000, UC_PROT_ALL) + self.assertEqual(len(ql.mem.map_info), 5 + 2) # GDT, shellcode_stack + self.assert_mem_equal(ql) + + def test_mem_protect(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + code = bytes([0x01, 0x70, 0x04]) + r_eax = 0x2000 + r_esi = 0xdeadbeef + ql.arch.regs.write(UC_X86_REG_EAX, r_eax) + ql.arch.regs.write(UC_X86_REG_ESI, r_esi) + ql.mem.map(0x1000, 0x1000, UC_PROT_READ | UC_PROT_EXEC) + ql.mem.map(0x2000, 0x1000, UC_PROT_READ) + ql.mem.protect(0x2000, 0x1000, UC_PROT_READ | UC_PROT_WRITE) + ql.mem.write(0x1000, code) + ql.emu_start(0x1000, 0x1000 + len(code) - 1, 0, 1) + buf = ql.mem.read(0x2000 + 4, 4) + self.assertEqual(int.from_bytes(buf, "little"), 0xdeadbeef) + self.assert_mem_equal(ql) + + def test_splitting_mem_unmap(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + ql.mem.map(0x20000, 0x1000, UC_PROT_NONE) + ql.mem.map(0x21000, 0x2000, UC_PROT_NONE) + try: + ql.mem.unmap(0x21000, 0x1000) + except UcError as e: + print(e) + for s, e, p in ql.uc.mem_regions(): + print(hex(s), hex(e), p) + for line in ql.mem.get_formatted_mapinfo(): + print(line) + self.assert_mem_equal(ql) + + def test_mem_protect_map_ptr(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + val = 0x114514 + data1 = bytearray(0x4000) + data2 = bytearray(0x2000) + ql.mem.map(0x4000, 0x4000, UC_PROT_ALL, "data1", data1) + ql.mem.unmap(0x6000, 0x2000) + ql.mem.change_mapinfo(0x4000, 0x4000 + 0x2000, UC_PROT_ALL, "data1") + self.assert_mem_equal(ql) + + # ql.mem.map will call map_ptr and add_mapinfo + ql.mem.map_ptr(0x6000, 0x2000, UC_PROT_ALL, data2) + ql.mem.add_mapinfo(0x6000, 0x6000 + 0x2000, + UC_PROT_ALL, "data2", False, data2) + + ql.mem.write(0x6004, val.to_bytes(8, "little")) + ql.mem.protect(0x6000, 0x1000, UC_PROT_READ) + buf = ql.mem.read(0x6004, 8) + self.assertEqual(int.from_bytes(buf, 'little'), val) + self.assert_mem_equal(ql) + + def test_map_at_the_end(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + mem = bytearray(0x1000) + mem[:0x100] = [0xff] * 0x100 + mem = bytes(mem) + ql.mem.map(0xfffffffffffff000, 0x1000, UC_PROT_ALL) + ql.mem.write(0xfffffffffffff000, mem) + self.assertRaises(UcError, ql.mem.write, 0xffffffffffffff00, mem) + self.assertRaises(UcError, ql.mem.write, 0, mem) + self.assert_mem_equal(ql) + + def test_mmap2(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + ql = Qiling(code=MIPS32EL_LIN, archtype="mips", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_onlinux.sh b/tests/test_onlinux.sh index 176693cbf..f430b8508 100755 --- a/tests/test_onlinux.sh +++ b/tests/test_onlinux.sh @@ -17,4 +17,5 @@ python3 ./test_mcu.py && python3 ./test_evm.py && python3 ./test_blob.py && python3 ./test_qdb.py && +python3 ./test_mem.py && echo "Done Test"