diff --git a/qiling/core.py b/qiling/core.py index 068afec64..1e857071e 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -177,6 +177,7 @@ def __init__( # Run the loader self.loader.run() + # Emulate the binary from begin until @end, with timeout in @timeout and # number of emulated instructions in @count def run(self, begin=None, end=None, timeout=0, count=0): @@ -208,30 +209,36 @@ def patch(self, addr, code, file_name=b''): else: self.patch_lib.append((addr, code, file_name.decode())) + # ql.output var getter @property def output(self): return self._output + # ql.output - output var setter eg. QL_OUTPUT.DEFAULT and etc @output.setter def output(self, output): self._output = output_convert(output) + # ql.platform - platform var = host os getter eg. LINUX and etc @property def platform(self): return self._platform + # ql.platform - platform var = host os setter eg. LINUX and etc @platform.setter def platform(self, value): self._platform = ostype_convert(value.lower()) + def __enable_bin_patch(self): for addr, code in self.patch_bin: self.mem.write(self.loader.load_address + addr, code) + def enable_lib_patch(self): for addr, code, filename in self.patch_lib: try: @@ -239,10 +246,12 @@ def enable_lib_patch(self): except: raise RuntimeError("Fail to patch filename %s at addr 0x%x" % (filename, addr)) + # stop emulation def emu_stop(self): self.uc.emu_stop() + # start emulation def emu_start(self, begin, end, timeout=0, count=0): self.uc.emu_start(begin, end, timeout, count) diff --git a/qiling/core_hooks.py b/qiling/core_hooks.py index 491ad1796..8dc98730f 100644 --- a/qiling/core_hooks.py +++ b/qiling/core_hooks.py @@ -20,47 +20,56 @@ def __init__(self, callback, user_data=None, begin=1, end=0): self.user_data = user_data self.begin = begin self.end = end - + + def bound_check(self, pc): if self.end < self.begin or (self.begin <= pc and self.end >= pc): return True return False + def check(self, *args): return True + def call(self, ql, *args): if self.user_data == None: return self.callback(ql, *args) return self.callback(ql, *args, self.user_data) + class HookAddr(Hook): def __init__(self, callback, address, user_data=None): super(HookAddr, self).__init__(callback, user_data, address, address) self.addr = address + def call(self, ql, *args): if self.user_data == None: return self.callback(ql) return self.callback(ql, self.user_data) + class HookIntr(Hook): def __init__(self, callback, intno, user_data=None): super(HookIntr, self).__init__(callback, user_data, 0, -1) self.intno = intno + def check(self, ql, intno): ql.dprint(D_CTNT, "[+] Received Interupt: %i Hooked Interupt: %i" % (intno, self.intno)) if intno < 0 or self.intno == intno: return True return False + class HookRet: def __init__(self, ql, t, h): self._ql = ql self._t = t self._h = h + def remove(self): self._ql.hook_del(self._t, self._h) @@ -102,6 +111,7 @@ def _callback_type3(self, uc, intno, pack_data): return callback(ql, intno, user_data) return callback(ql, intno) # callback does not require user_data + def _hook_intr_cb(self, uc, intno, pack_data): ql, hook_type = pack_data catched = False @@ -116,6 +126,7 @@ def _hook_intr_cb(self, uc, intno, pack_data): if catched == False: raise QlErrorCoreHook("_hook_intr_cb : catched == False") + def _hook_insn_cb(self, uc, *args): ql, hook_type = args[-1] @@ -126,18 +137,21 @@ def _hook_insn_cb(self, uc, *args): if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: break + def _callback_type4(self, uc, addr, size, pack_data): ql, user_data, callback = pack_data if user_data: return callback(ql, addr, size, user_data) return callback(ql, addr, size) + def _callback_type4a(self, uc, _addr, _size, pack_data): ql, user_data, callback = pack_data if user_data: return callback(ql, user_data) return callback(ql) + def _hook_trace_cb(self, uc, addr, size, pack_data): ql, hook_type = pack_data if hook_type in self._hook.keys(): @@ -147,12 +161,14 @@ def _hook_trace_cb(self, uc, addr, size, pack_data): if isinstance(ret, int) == True and ret & QL_HOOK_BLOCK != 0: break + def _callback_type6(self, uc, access, addr, size, value, pack_data): ql, user_data, callback = pack_data if user_data: return callback(ql, addr, size, value, user_data) return callback(ql, addr, size, value) + def _hook_mem_cb(self, uc, access, addr, size, value, pack_data): ql, hook_type = pack_data handled = False @@ -168,12 +184,14 @@ def _hook_mem_cb(self, uc, access, addr, size, value, pack_data): if handled == False: raise QlErrorCoreHook("_hook_mem_cb : handled == False") + def _callback_x86_syscall(self, uc, pack_data): ql, user_data, callback = pack_data if user_data: return callback(ql, user_data) return callback(ql) - + + def _hook_insn_invalid_cb(self, uc, pack_data): ql, hook_type = pack_data catched = False @@ -187,6 +205,7 @@ def _hook_insn_invalid_cb(self, uc, pack_data): if catched == False: raise QlErrorCoreHook("_hook_intr_invalid_cb : catched == False") + def _hook_addr_cb(self, uc, addr, size, pack_data): ql, addr = pack_data if addr in self._addr_hook.keys(): @@ -203,11 +222,13 @@ def _ql_hook_internal(self, hook_type, callback, user_data=None, *args): # pack user_data & callback for wrapper _callback return self.uc.hook_add(hook_type, _callback, (self, user_data), 1, 0, *args) + def _ql_hook_addr_internal(self, callback, user_data, address): _callback = (catch_KeyboardInterrupt(self))(callback) # pack user_data & callback for wrapper _callback return self.uc.hook_add(UC_HOOK_CODE, _callback, (self, user_data), address, address) + def _ql_hook(self, hook_type, h, *args): base_type = [ UC_HOOK_INTR, @@ -269,35 +290,45 @@ def _ql_hook(self, hook_type, h, *args): self._hook[t] = [] self._hook[t].append(h) + def ql_hook(self, hook_type, callback, user_data=None, begin=1, end=0, *args): h = Hook(callback, user_data, begin, end) self._ql_hook(hook_type, h, *args) return HookRet(self, hook_type, h) + def hook_code(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_CODE, callback, user_data, begin, end) + def hook_intr(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_INTR, callback, user_data, begin, end) + def hook_block(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_BLOCK, callback, user_data, begin, end) + def hook_mem_unmapped(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_UNMAPPED, callback, user_data, begin, end) + def hook_mem_read_invalid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_READ_INVALID, callback, user_data, begin, end) + def hook_mem_write_invalid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_WRITE_INVALID, callback, user_data, begin, end) + def hook_mem_fetch_invalid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_FETCH_INVALID, callback, user_data, begin, end) + def hook_mem_invalid(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_VALID, callback, user_data, begin, end) + # a convenient API to set callback for a single address def hook_address(self, callback, address, user_data=None): h = HookAddr(callback, address, user_data) @@ -311,23 +342,29 @@ def hook_address(self, callback, address, user_data=None): self._addr_hook[address].append(h) return HookRet(self, None, h) + def hook_intno(self, callback, intno, user_data=None): h = HookIntr(callback, intno, user_data) self._ql_hook(UC_HOOK_INTR, h) return HookRet(self, UC_HOOK_INTR, h) + def hook_mem_read(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_READ, callback, user_data, begin, end) + def hook_mem_write(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_WRITE, callback, user_data, begin, end) + def hook_mem_fetch(self, callback, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_MEM_FETCH, callback, user_data, begin, end) + def hook_insn(self, callback, arg1, user_data=None, begin=1, end=0): return self.ql_hook(UC_HOOK_INSN, callback, user_data, begin, end, arg1) + # replace linux or windows syscall/api with custom api/syscall # if replace function name is needed, first syscall must be available # - ql.set_syscall(0x04, my_syscall_write) @@ -362,6 +399,7 @@ def set_syscall(self, syscall_cur, syscall_new, pos = QL_INTERCEPT.CALL): syscall_name = "ql_syscall_" + str(syscall_cur) self.os.dict_posix_onExit_syscall[syscall_name] = syscall_new + # replace default API with customed function def set_api(self, api_name, my_func): if self.ostype in (QL_OS.WINDOWS, QL_OS.UEFI): @@ -369,12 +407,14 @@ def set_api(self, api_name, my_func): else: self.os.add_function_hook(api_name, my_func) + # ql.func_arg - get syscall for all posix series @property def func_arg(self): if self.ostype in (QL_POSIX): return self.os.get_func_arg() + def hook_del(self, *args): if len(args) != 1 and len(args) != 2: return @@ -446,6 +486,7 @@ def clear_hooks(self): self.clear_ql_hooks() + def clear_ql_hooks(self): self._hook = {} self._hook_fuc = {} diff --git a/qiling/core_struct.py b/qiling/core_struct.py index f5eb9628d..0d6ac7d12 100644 --- a/qiling/core_struct.py +++ b/qiling/core_struct.py @@ -19,57 +19,69 @@ def __init__(self): self.archendian = None self.archbit = 0 + def unpack64(self, x): return struct.unpack('Q', x)[0] + def pack64(self, x): return struct.pack('Q', x) + def pack64s(self, x): return struct.pack('q', x) + def unpack64s(self, x): return struct.unpack('q', x)[0] + def unpack32(self, x): if self.archendian == QL_ENDIAN.EB: return struct.unpack('>I', x)[0] else: return struct.unpack('I', x)[0] + def pack32(self, x): if self.archendian == QL_ENDIAN.EB: return struct.pack('>I', x) else: return struct.pack('I', x) + def unpack32s(self, x): if self.archendian == QL_ENDIAN.EB: return struct.unpack('>i', x)[0] else: return struct.unpack('i', x)[0] + def unpack32s_ne(self, x): return struct.unpack('i', x)[0] + def pack32s(self, x): if self.archendian == QL_ENDIAN.EB: return struct.pack('>i', x) else: return struct.pack('i', x) + def unpack16(self, x): if self.archendian == QL_ENDIAN.EB: return struct.unpack('>H', x)[0] else: return struct.unpack('H', x)[0] + def pack16(self, x): if self.archendian == QL_ENDIAN.EB: return struct.pack('>H', x) else: return struct.pack('H', x) + def pack(self, data): if self.archbit == 64: return self.pack64(data) @@ -77,6 +89,7 @@ def pack(self, data): return self.pack32(data) raise QlErrorStructConversion("[!] Qiling pack conversion failed!") + def packs(self, data): if self.archbit == 64: return self.pack64s(data) @@ -84,6 +97,7 @@ def packs(self, data): return self.pack32s(data) raise QlErrorStructConversion("[!] Qiling packs conversion failed!") + def unpack(self, data): if self.archbit == 64: return self.unpack64(data) @@ -91,6 +105,7 @@ def unpack(self, data): return self.unpack32(data) raise QlErrorStructConversion("[!] Qiling unpack conversion failed!") + def unpacks(self, data): if self.archbit == 64: return self.unpack64s(data) diff --git a/qiling/core_utils.py b/qiling/core_utils.py index f438e7f62..655ae8b6b 100644 --- a/qiling/core_utils.py +++ b/qiling/core_utils.py @@ -92,27 +92,33 @@ def dprint(self, level, *args, **kw): if int(self.verbose) >= level and self.output in (QL_OUTPUT.DEBUG, QL_OUTPUT.DUMP): self.nprint(*args, **kw) + def add_fs_mapper(self, host_src, ql_dest): self.fs_mapper.append([host_src, ql_dest]) + # push to stack bottom, and update stack register def stack_push(self, data): self.arch.stack_push(data) + # pop from stack bottom, and update stack register def stack_pop(self): return self.arch.stack_pop() + # read from stack, at a given offset from stack bottom # NOTE: unlike stack_pop(), this does not change stack register def stack_read(self, offset): return self.arch.stack_read(offset) + # write to stack, at a given offset from stack bottom # NOTE: unlike stack_push(), this does not change stack register def stack_write(self, offset, data): self.arch.stack_write(offset, data) + def arch_setup(self): if not ql_is_valid_arch(self.archtype): raise QlErrorArch("[!] Invalid Arch") @@ -123,6 +129,7 @@ def arch_setup(self): module_name = ql_build_module_import_name("arch", None, self.archtype) return ql_get_module_function(module_name, archmanager)(self) + def os_setup(self, function_name = None): if not ql_is_valid_ostype(self.ostype): raise QlErrorOsType("[!] Invalid OSType") @@ -148,6 +155,7 @@ def os_setup(self, function_name = None): module_name = ql_build_module_import_name("os", self.ostype, self.archtype) return ql_get_module_function(module_name, function_name) + def loader_setup(self, function_name = None): if not self.shellcoder: self.archtype, self.ostype, self.archendian = ql_checkostype(self.path) @@ -164,6 +172,7 @@ def loader_setup(self, function_name = None): module_name = ql_build_module_import_name("loader", loadertype_str.lower()) return ql_get_module_function(module_name, function_name)(self) + def component_setup(self, component_type, function_name): if not ql_is_valid_ostype(self.ostype): raise QlErrorOsType("[!] Invalid OSType") @@ -175,6 +184,7 @@ def component_setup(self, component_type, function_name): function_name = "Ql" + function_name.capitalize() + "Manager" return ql_get_module_function(module_name, function_name)(self) + def profile_setup(self): if self.profile: self.dprint(D_INFO, "[+] Customized profile: %s" % self.profile) @@ -190,12 +200,14 @@ def profile_setup(self): config.read(profiles) return config + def compile(self, archtype, runcode, arm_thumb=None): try: loadarch = KS_ARCH_X86 except: raise QlErrorOutput("Please install Keystone Engine") + def ks_convert(arch): if self.archendian == QL_ENDIAN.EB: adapter = { @@ -216,10 +228,8 @@ def ks_convert(arch): QL_ARCH.ARM64: (KS_ARCH_ARM64, KS_MODE_ARM), } - if arch in adapter: - return adapter[arch] - # invalid - return None, None + return adapter.get(arch, (None,None)) + def compile_instructions(runcode, archtype, archmode): ks = Ks(archtype, archmode)