查看原文
其他

Unicorn 调用SO之加载模块

无名侠 看雪学院 2019-09-16

本文为看雪论坛精华文章
看雪论坛作者ID:无名侠




Unicorn 调用SO之加载模块


Android是基于Linux开发的,Android Native原生库是ELF文件格式。Unicorn 并不能加载ELF文件,所以我们要自己将ELF文件加载到Unicorn虚拟机的内存中去。加载ELF 文件是一个很复杂的过程,涉及到ELF文件解析、重定位、符号解析、依赖库加载等。
 
python 可以使用elftools库解析ELF 文件。
 
elftools 安装
pip install pyelftools


映射ELF 文件
ELF 文件有两种视图,链接视图和执行视图。elftools 是基于链接视图解析ELF格式的,然而现在有一些ELF文件的section信息是被抹掉的,elftools就无法正常工作,我也没时间重写一个elf loader,就只能凑合用一下elftools
 
我已经在前面一篇文章介绍了内存分配方面的东西,加载ELF文件第一步需要将ELF文件映射到内存。如何映射呢?只需要找到类型为PT_LOAD的segment,按照segment的信息映射即可。
 
代码如下:


# - LOAD (determinate what parts of the ELF file get mapped into memory)
load_segments = [x for x in elf.iter_segments() if x.header.p_type == 'PT_LOAD']

for segment in load_segments:
prot = UC_PROT_ALL
self.emu.memory.mem_map(load_base + segment.header.p_vaddr, segment.header.p_memsz, prot)
self.emu.memory.mem_write(load_base + segment.header.p_vaddr, segment.data())


解析 init_array
ELF 的有一个列表,用于存储初始化函数地址,在动态链接的时候,linker会依次调用init_array中的每一个函数。init_array 中的函数一般用于初始化程序,偶尔也有ELF外壳程序在init_array中添加自解密代码,另外有一些字符串解密也是在init_array中完成的。想要模拟native程序,必然需要调用init_array 中的函数。
 
init_array 是一个数组, 一般情况下,每一项都是函数入口偏移, 然而也有为0的情况。因为init_array实际解析时机在重定位完成之后, init_array 也可能被重定位。所以要解析init_array的时候还需要判断重定位表。
 
我的策略是,当读出init_array中为0的条目的时候就去重定位表中查找重定位值。


for _ in range(int(init_array_size / 4)):
# covert va to file offset
for seg in load_segments:
if seg.header.p_vaddr <= init_array_offset < seg.header.p_vaddr + seg.header.p_memsz:
init_array_foffset = init_array_offset - seg.header.p_vaddr + seg.header.p_offset
fstream.seek(init_array_foffset)
data = fstream.read(4)
fun_ptr = struct.unpack('I', data)[0]
if fun_ptr != 0:
# fun_ptr += load_base
init_array.append(fun_ptr + load_base)
print ("find init array for :%s %x" % (filename, fun_ptr))
else:
# search in reloc
for rel in rel_section.iter_relocations():
rel_info_type = rel['r_info_type']
rel_addr = rel['r_offset']
if rel_info_type == arm.R_ARM_ABS32 and rel_addr == init_array_offset:
sym = dynsym.get_symbol(rel['r_info_sym'])
sym_value = sym['st_value']
init_array.append(load_base + sym_value)
print ("find init array for :%s %x" % (filename, sym_value))
break
init_array_offset += 4


解析符号


32位ELF文件 symbol table entry 的定义如下:


typedef struct {
Elf32_Word st_name;
Elf32_Addr st_value;
Elf32_Word st_size;
unsigned char st_info;
unsigned char st_other;
Elf32_Half st_shndx;
} Elf32_Sym;


当st_shndx字段的值为SHN_UNDEF时,表明该符号在当前模块没有定义,是一个导入符号,要去其它模块查找。

为了便于管理已经加载模块的符号地址,应该用一个map,将nameaddress映射起来。


其它情况,简单起见,均看成导出符号,将地址重定位后加入到管理符号map。


# Resolve all symbols.
symbols_resolved = dict()

for section in elf.iter_sections():
if not isinstance(section, SymbolTableSection):
continue
itersymbols = section.iter_symbols()
next(itersymbols) # Skip first symbol which is always NULL.
for symbol in itersymbols:
symbol_address = self._elf_get_symval(elf, load_base, symbol)
if symbol_address is not None:
symbols_resolved[symbol.name] = SymbolResolved(symbol_address, symbol)

def _elf_get_symval(self, elf, elf_base, symbol):
if symbol.name in self.symbol_hooks:
return self.symbol_hooks[symbol.name]

if symbol['st_shndx'] == 'SHN_UNDEF': # 外部符号
# External symbol, lookup value.
target = self._elf_lookup_symbol(symbol.name)
if target is None:
# Extern symbol not found
if symbol['st_info']['bind'] == 'STB_WEAK':
# Weak symbol initialized as 0
return 0
else:
logger.error('=> Undefined external symbol: %s' % symbol.name)
return None
else:
return target
elif symbol['st_shndx'] == 'SHN_ABS':
# Absolute symbol.
return elf_base + symbol['st_value']
else:
# Internally defined symbol.
return elf_base + symbol['st_value']


重定位


# Relocate.
for section in elf.iter_sections():
if not isinstance(section, RelocationSection):
continue
#for relsection in elf.get_dynmic_rel():
for rel in section.iter_relocations():
sym = dynsym.get_symbol(rel['r_info_sym'])
sym_value = sym['st_value']

rel_addr = load_base + rel['r_offset'] # Location where relocation should happen
rel_info_type = rel['r_info_type']

# Relocation table for ARM
if rel_info_type == arm.R_ARM_ABS32:
# Create the new value.
value = load_base + sym_value
# Write the new value
self.emu.mu.mem_write(rel_addr, value.to_bytes(4, byteorder='little'))

elif rel_info_type == arm.R_ARM_GLOB_DAT or \
rel_info_type == arm.R_ARM_JUMP_SLOT or \
rel_info_type == arm.R_AARCH64_GLOB_DAT or \
rel_info_type == arm.R_AARCH64_JUMP_SLOT:
# Resolve the symbol.
if sym.name in symbols_resolved:
value = symbols_resolved[sym.name].address

# Write the new value
self.emu.mu.mem_write(rel_addr, value.to_bytes(4, byteorder='little'))
elif rel_info_type == arm.R_ARM_RELATIVE or \
rel_info_type == arm.R_AARCH64_RELATIVE:
if sym_value == 0:
# Load address at which it was linked originally.
value_orig_bytes = self.emu.mu.mem_read(rel_addr, 4)
value_orig = int.from_bytes(value_orig_bytes, byteorder='little')

# Create the new value
value = load_base + value_orig

# Write the new value
self.emu.mu.mem_write(rel_addr, value.to_bytes(4, byteorder='little'))
else:
raise NotImplementedError()
else:
logger.error("Unhandled relocation type %i." % rel_info_type)



Unicorn 调用SO之内存管理


Unicorn 采用虚拟内存机制,使得虚拟CPU的内存与真实CPU的内存隔离。


Unicorn 使用如下API来操作内存:
  • uc_mem_map

  • uc_mem_read

  • uc_mem_write


使用uc_mem_map映射内存的时候,address 与 size 都需要与0x1000对齐,也就是0x1000的整数倍,否则会报UC_ERR_ARG 异常。

内存对齐
UC_MEM_ALIGN = 0x1000

Unicorn map的内存基地址和长度都需要对齐到0x1000,对齐函数如下:


# Thansk to https://github.com/lunixbochs/usercorn/blob/master/go/mem.go
def align(addr, size, growl):
to = ctypes.c_uint64(UC_MEM_ALIGN).value
mask = ctypes.c_uint64(0xFFFFFFFFFFFFFFFF).value ^ ctypes.c_uint64(to - 1).value
right = addr + size
right = (right + to - 1) & mask
addr &= mask
size = right - addr
if growl:
size = (size + to - 1) & mask
return addr, size


在 Unicorn 虚拟机中分配内存(用于加载模块)。


Memory类用于分配内存


class Memory:
"""
:type emu androidemu.emulator.Emulator
"
""
def __init__(self, emu):
self.emu = emu
self.counter_memory = config.BASE_ADDR
self.counter_stack = config.STACK_ADDR + config.STACK_SIZE

def mem_reserve(self, size):
(_, size_aligned) = align(0, size, True)
ret = self.counter_memory
self.counter_memory += size_aligned
return ret

def mem_map(self, address, size, prot):
(address, size) = align(address, size, True)

self.emu.mu.mem_map(address, size, prot)

logger.debug("=> Mapping memory page 0x%08x - 0x%08x, size 0x%08x, prot %s" % (address, address + size, size,
prot))

def mem_write(self, address, data):
self.emu.mu.mem_write(address, data)

def mem_read(self, address, size):
return self.emu.mu.mem_read(address, size)


heap 的实现
为了对malloc和free等函数提供支持,需要实现heap。可喜可贺,Afl-Unicorn 开源项目中已经有一个heap例子,可以直接使用。


heap 实现代码


from unicorn import *
from unicorn.arm64_const import *

# Page size required by Unicorn
UNICORN_PAGE_SIZE = 0x1000

# Max allowable segment size (1G)
MAX_ALLOWABLE_SEG_SIZE = 1024 * 1024 * 1024

# Alignment functions to align all memory segments to Unicorn page boundaries (4KB pages only)
ALIGN_PAGE_DOWN = lambda x: x & ~(UNICORN_PAGE_SIZE - 1)
ALIGN_PAGE_UP = lambda x: (x + UNICORN_PAGE_SIZE - 1) & ~(UNICORN_PAGE_SIZE-1)


# Implementation from
# https://github.com/Battelle/afl-unicorn/blob/44a50c8a9426ffe4ad8714ef8a35dc011e62f739/unicorn_mode/helper_scripts/unicorn_loader.py#L45
class UnicornSimpleHeap:
""" Use this class to provide a simple heap implementation. This should
be used if malloc/free calls break things during emulation.
"""


# Helper data-container used to track chunks
class HeapChunk(object):
def __init__(self, data_addr, data_size):
self.data_addr = data_addr
self.data_size = data_size

# Returns true if the specified buffer is completely within the chunk, else false
def is_buffer_in_chunk(self, addr, size):
if addr >= self.data_addr and ((addr + size) <= (self.data_addr + self.data_size)):
return True
else:
return False

_uc = None # Unicorn engine instance to interact with
_chunks = [] # List of all known chunks
_debug_print = False # True to print debug information

def __init__(self, uc, heap_min_addr, heap_max_addr, debug_print=False):
self._uc = uc
self._heap_min_addr = heap_min_addr
self._heap_max_addr = heap_max_addr
self._debug_print = debug_print

# Add the watchpoint hook that will be used to implement psuedo-guard page support
# self._uc.hook_add(UC_HOOK_MEM_WRITE | UC_HOOK_MEM_READ, self.__check_mem_access)

def malloc(self, size, prot=UC_PROT_READ | UC_PROT_WRITE):
# Figure out the overall size to be allocated/mapped
# - Allocate at least 1 4k page of memory to make Unicorn happy
data_size = ALIGN_PAGE_UP(size)
# Gross but efficient way to find space for the chunk:
chunk = None
for addr in range(self._heap_min_addr, self._heap_max_addr, UNICORN_PAGE_SIZE):
try:
self._uc.mem_map(addr, data_size, prot)
chunk = self.HeapChunk(addr, data_size)
if self._debug_print:
print("Allocating 0x{0:x}-byte chunk @ 0x{1:016x}".format(chunk.data_size, chunk.data_addr))
break
except UcError as e:
continue
# Something went very wrong
if chunk is None:
raise Exception("Oh no.")
self._chunks.append(chunk)
return chunk.data_addr

def calloc(self, size, count):
# Simple wrapper around malloc with calloc() args
return self.malloc(size * count)

def realloc(self, ptr, new_size):
# Wrapper around malloc(new_size) / memcpy(new, old, old_size) / free(old)
if self._debug_print:
print("Reallocating chunk @ 0x{0:016x} to be 0x{1:x} bytes".format(ptr, new_size))
old_chunk = None
for chunk in self._chunks:
if chunk.data_addr == ptr:
old_chunk = chunk
new_chunk_addr = self.malloc(new_size)
if old_chunk is not None:
self._uc.mem_write(new_chunk_addr, str(self._uc.mem_read(old_chunk.data_addr, old_chunk.data_size)))
self.free(old_chunk.data_addr)
return new_chunk_addr

def protect(self, addr, len_in, prot):
for chunk in self._chunks:
if chunk.is_buffer_in_chunk(addr, len_in):
# self._uc.mem_protect(chunk.data_addr, chunk.data_size, perms=prot)
return True
return False

def free(self, addr):
for chunk in self._chunks:
if chunk.is_buffer_in_chunk(addr, 1):
if self._debug_print:
print("Freeing 0x{0:x}-byte chunk @ 0x{0:016x}".format(chunk.data_addr, chunk.data_size))
self._uc.mem_unmap(chunk.data_addr, chunk.data_size)
self._chunks.remove(chunk)
return True
return False


处理内存管理方面的syscall


native程序可能直接通过syscall调用mmap2 映射内存。libc等库最底层也一定会调用mmap2分配内存。


self._syscall_handler.set_handler(0x5B, "munmap", 2, self._handle_munmap)
self._syscall_handler.set_handler(0x7D, "mprotect", 3, self._handle_mprotect)
self._syscall_handler.set_handler(0xC0, "mmap2", 6, self._handle_mmap2)
self._syscall_handler.set_handler(0xDC, "madvise", 3, self._handle_madvise)


mmap2 实际并不是一个纯粹的内存管理函数,它还能映射文件到内存,这就涉及到文件系统的支持。我在原作者的基础上进行了修改。


def _handle_mmap2(self, mu, addr, length, prot, flags, fd, offset):
"""
void *mmap2(void *addr, size_t length, int prot, int flags, int fd, off_t pgoffset);
"""


# MAP_FILE 0
# MAP_SHARED 0x01
# MAP_PRIVATE 0x02
# MAP_FIXED 0x10
# MAP_ANONYMOUS 0x20
prot = UC_PROT_ALL
addr = self._heap.malloc(length, prot)

if fd != 0xffffffff: # 如果有fd
if fd <= 2:
raise NotImplementedError("Unsupported read operation for file descriptor %d." % fd)

if fd not in self._file_system._file_descriptors:
# TODO: Return valid error.
raise NotImplementedError()

file = self._file_system._file_descriptors[fd]
data = open(file.name_virt, 'rb').read(length)
self._mu.mem_write(addr, data)
return addr


hook libc中的内存管理函数。


既然我们的框架支持加载多个lib库,直接加载libc就可以了,为何还需要hook libc中的这些符号呢?


实际上,目前的模拟框架并不完善,很多系统信息模拟不到位,libc初始化不完善,使用libc的malloc经常会出现异常。所以直接接替为内置的heap 管理器。


#memory
modules.add_symbol_hook('malloc', hooker.write_function(self.malloc) + 1)
modules.add_symbol_hook('free', hooker.write_function(self.free) + 1)
modules.add_symbol_hook('calloc', hooker.write_function(self.calloc) + 1)


内存访问属性
我们知道,常见的内存权限有可读可写可执行,一般可以调用mprotect 函数修改指定内存页面的权限。为了方便,我们将所有内存设置为可读可写可执行。



Unicorn 调用SO之函数级Hook


很多情况下,有一些系统API无法直接调用,就需要手动实现它,比如以下应用场景:


1、在libc没有完全初始化的情况下,直接调用malloc 可能会崩溃,为了使程序更加稳定,就需要自己实现malloc和free。


2、实现dlopen,就可以使用框架的模块管理器来加载模块,而不用linker。


3、打出dlopen、dlsym等函数的日志,还可以分析native程序运行过程中会调用的API。


4、360加固,在反调试完成后,它会调用dlopen加载libz.so,并调用uncompress函数解压数据。hook uncompress 就可以拿到解压的数据。


5、目前,许多加固的整体加固会调用art中的api来加载dex文件,如果这些函数被Hook,那么就可以直接拿到dex文件,岂不美哉?


6、JNI Functions 实现,需要Hook 技术支撑。
 
不幸的是,Unicorn 内部并没有函数的概念,它只是一个单纯的CPU,没有HOOK_FUNCTION的callback, Hook 函数看上去困难重重。AndroidNativeEmu 为我们提供了一个很好的Hook思路。
 
AndroidNativeEmu 中的函数级Hook 并不是真正意义上的Hook,它不仅能Hook存在的函数,还能Hook不存在的函数。AndroidNativeEmu 使用这种技术实现了JNI函数Hook、库函数Hook。Jni函数是不存的,Hook它只是为了能够用Python 实现 Jni Functions。有一些库函数是存在的,Hook只是为了重新实现它。



用 Python 实现 Unicorn 虚拟机内部的函数


用 Python 实现 Unicorn 虚拟机内部的函数,首先要解决 Unicorn 虚拟机内部如何与外部交互。AndroidNativeEmu 的实现类似于系统调用,它会为每一个Hook函数实现一个stub函数,stub函数中有一条“陷阱”指令,当虚拟CPU执行这一条”陷阱“指令的时候就会被HOOK_CODE 捕获,然后通过R4寄存器的值确定Python 的处理函数。
 
stub 函数代码如下:


PUSH {R4,LR}
MOV R4, Number
IT AL
POP {R4,PC}


HOOK_CODEcallback 直接检测PC寄存器指向内存的字节数据是否为"\xE8\xBF"来判断IT AL指令。如果是IT AL指令,则根据R4寄存器的值确定Python 回调的函数。
 
Native程序中可能会有许多IT AL指令,会误判吗?


HOOK_CODE 会trace每一条指令,这种方式效率岂不是很低?
 
这两个问题都很好解决,因为HOOK_CODE是有作用范围的,如果开辟一段空间,完全用于存放stub,然后将该callback设置在这个空间范围内,就可以很好的避免了冲突和效率问题!


def _hook(self, mu, address, size, user_data):
# Check if instruction is "IT AL"
if size != 2 or self._emu.mu.mem_read(address, size) != b"\xE8\xBF":
return

# Find hook.
hook_id = self._emu.mu.reg_read(UC_ARM_REG_R4)
hook_func = self._hooks[hook_id]

# Call hook.
try:
hook_func(self._emu)
except:
# Make sure we catch exceptions inside hooks and stop emulation.
mu.emu_stop()
raise


使用 keystone 动态编译 stub


Keystone 是一款很牛逼的汇编框架,Unicorn的兄弟!
 
使用pip安装


pip install keystone


keystone 使用方法十分简单,不需要额外学习即可理解接下来的代码。write_function函数将python的func函数映射到Unicorn虚拟机,并返回虚拟机中的函数地址, 如果在虚拟机中调用该地址就会被Python捕获并调用func函数。


def write_function(self, func):
# Get the hook id.
hook_id = self._get_next_id()
hook_addr = self._hook_current

# Create the ARM assembly code.
# Make sure to update STACK_OFFSET if you change the PUSH/POP.
asm = "PUSH {R4,LR}\n" \
"MOV R4, #" + hex(hook_id) + "\n" \
"IT AL\n" \
"POP {R4,PC}"

asm_bytes_list, asm_count = self._keystone.asm(bytes(asm, encoding='ascii'))

if asm_count != 4:
raise ValueError("Expected asm_count to be 4 instead of %u." % asm_count)

# Write assembly code to the emulator.
self._emu.mu.mem_write(hook_addr, bytes(asm_bytes_list))

# Save results.
self._hook_current += len(asm_bytes_list)
self._hooks[hook_id] = func

return hook_addr


symbol hook


上一个小结中讲到如何实现stub,那么如何hook 呢?AndroidNativeEmu实现了Symbol Hook。这种HOOK与平时常见的IAT HOOK、GOT Hook原理是一样的。调用add_symbol_hook函数,可以将符号和新地址关联起来, 模块加载的时候,查找符号优先从该表中获取地址。

这种实现方式有点bug,比如有一些Native 程序,自身会got hook,那么这种方式可能就会出问题。


例子:使用Hook 实现 AAssetManager_open


modules.add_symbol_hook('AAssetManager_open', hooker.write_function(self.AAssetManager_open) + 1)
@native_method
def AAssetManager_open(self, mu, mgr, filename_ptr, mode):
filename = memory_helpers.read_utf8(mu, filename_ptr)
filename = "assert/" + filename
logger.info("AAssetManager_open(%s,%d)" % (filename, mode))
fd = self._open_file(filename, 0, mode)
if fd == -1:
fd = 0
return fd


python 实现 Native 函数修饰 @native_method


大家可能对Python 装饰器的语法不太熟悉,可以参考廖雪峰老师博客:Python 装饰器。
 
简单的讲,装饰器可以自定义修改函数,为目标函数套一层外壳。


native_method主要功能是处理参数数据,这样就能很优雅地编写Native 函数,不用再函数中冗杂地调用寄存器读写函数。


def native_method(func):
def native_method_wrapper(*argv):
"""
:type self
:type emu androidemu.emulator.Emulator
:type mu Uc
"""
emu = argv[1] if len(argv) == 2 else argv[0]
mu = emu.mu

args = inspect.getfullargspec(func).args # 判断参数个数
args_count = len(args) - (2 if 'self' in args else 1)

if args_count < 0:
raise RuntimeError("NativeMethod accept at least (self, mu) or (mu).")

native_args = []

if args_count >= 1: # 从寄存器取参数
native_args.append(mu.reg_read(UC_ARM_REG_R0))

if args_count >= 2:
native_args.append(mu.reg_read(UC_ARM_REG_R1))

if args_count >= 3:
native_args.append(mu.reg_read(UC_ARM_REG_R2))

if args_count >= 4:
native_args.append(mu.reg_read(UC_ARM_REG_R3))

sp = mu.reg_read(UC_ARM_REG_SP) # 参数大于4个,从栈中取参数
sp = sp + STACK_OFFSET # Need to offset by 4 because our hook pushes one register on the stack.

if args_count >= 5:
for x in range(0, args_count - 4):
native_args.append(int.from_bytes(mu.mem_read(sp + (x * 4), 4), byteorder='little'))

if len(argv) == 1:
result = func(mu, *native_args)
else:
result = func(argv[0], mu, *native_args)

if result is not None:
native_write_arg_register(emu, UC_ARM_REG_R0, result)
else:
mu.reg_write(UC_ARM_REG_R0, JNI_ERR)
return native_method_wrapper

小结


这篇文章讲了如何实现 Unicorn 内部的函数级Hook,原理是在模块加载重定位阶段,填充stub函数地址到目标函数的 got表。stub函数是Unicorn 内部环境与外部环境的一个桥梁,使用小巧的IT AL和R4寄存器实现交互,像极了系统调用。使用Python的装饰器,简化了Python 编写Hook 函数的难度。



Unicorn 调用SO之系统调用


系统调用是操作系统给应用程序提供的最底层的基础接口。应用程序读写文件、访问网络等操作都需要操作系统支持。
 
Unicorn 拦截系统调用只需要添加UC_HOOK_INTR的callback,该callback的参数定义如下:


typedef void (*uc_cb_hookintr_t)(uc_engine *uc, uint32_t intno, void *user_data);


  • intno: 中断号

  • user_data: user data


我们只需要用hook_add 添加一个 UC_HOOK_INTR 的callback 就能够处理中断了。根据intno 中断号分发不同的中断处理函数。


实现getpid


self._syscall_handler.set_handler(0xe0, "getpid", 0, self._getPid)
def _getpid(self, mu):
return 0x1122

实现文件系统需要拦截的syscall


我们将在文件系统章节详细讨论如何实现文件系统。


syscall_handler.set_handler(0x3, "read", 3, self._handle_read)
syscall_handler.set_handler(0x4, "write", 3, self._handle_write)
syscall_handler.set_handler(0x5, "open", 3, self._handle_open)
syscall_handler.set_handler(0x6, "close", 1, self._handle_close)
syscall_handler.set_handler(0x92, "writev", 3, self._handle_writev)
syscall_handler.set_handler(0xC5, "fstat64", 2, self._handle_fstat64)
syscall_handler.set_handler(0x142, "openat", 4, self._handle_openat)
syscall_handler.set_handler(0x147, "fstatat64", 4, self._handle_fstatat64)
syscall_handler.set_handler(0x14e, "faccessat", 4, self._faccessat)
syscall_handler.set_handler(0x14d, "fchmodat", 4, self._fchmodat)
syscall_handler.set_handler(0x8c, "_llseek", 5, self._llseek)
syscall_handler.set_handler(0x13, "lseek", 3, self._lseek)


实现内存管理的syscall


self._syscall_handler.set_handler(0x5B, "munmap", 2, self._handle_munmap)
self._syscall_handler.set_handler(0x7D, "mprotect", 3, self._handle_mprotect)
self._syscall_handler.set_handler(0xC0, "mmap2", 6, self._handle_mmap2)
self._syscall_handler.set_handler(0xDC, "madvise", 3, self._handle_madvise)


其它杂项syscall


这些实现都非常的简单,就不过多展开了。


self._syscall_handler.set_handler(0x4E, "gettimeofday", 2, self._handle_gettimeofday)
self._syscall_handler.set_handler(0xAC, "prctl", 5, self._handle_prctl)
self._syscall_handler.set_handler(0xF0, "futex", 6, self._handle_futex)
self._syscall_handler.set_handler(0x107, "clock_gettime", 2, self._handle_clock_gettime)
self._syscall_handler.set_handler(0x119, "socket", 3, self._socket)
self._syscall_handler.set_handler(0x11b, "connect", 3, self._connect)
self._syscall_handler.set_handler(0x159, "getcpu", 3, self._getcpu)
self._syscall_handler.set_handler(0x14e, "faccessat", 4, self._faccessat)
self._syscall_handler.set_handler(0x14, "getpid", 0, self._getpid)
self._syscall_handler.set_handler(0xe0, "gettid", 0, self._gettid)
self._syscall_handler.set_handler(0x180, "null1", 0, self._null)
self._syscall_handler.set_handler(0x7e, "sigprocmask", 0, self._null)
self._syscall_handler.set_handler(0xaf, "rt_sigprocmask", 0, self._null)
self._syscall_handler.set_handler(0x10c, "sigaction", 0, self._tgkill)
self._syscall_handler.set_handler(0x43, "sigaction", 0, self._sigaction)
self._syscall_handler.set_handler(0xf8, "exit", 0, self._null)
self._syscall_handler.set_handler(0x16e, "accept4", 0, self.accept4)



Unicorn 调用SO之文件系统


如果想尽可能完美的模拟Android Native程序,那就必须加入虚拟文件系统的支持。虚拟文件系统可以将Unicorn 虚拟机内部访问文件的操作映射到主机。



>>>>

思路


Hook 拦截文件操作相关的系统调用,转换成对主机的文件操作。
 
为了安全性,我们要在主机上划分一个用于专门虚拟文件系统的目录。 处理系统调用的时候,将路径转换成虚拟文件系统目录的路径。



>>>>

实现


需要处理的syscall已经在上一篇文章介绍过了。


syscall_handler.set_handler(0x3, "read", 3, self._handle_read)
syscall_handler.set_handler(0x4, "write", 3, self._handle_write)
syscall_handler.set_handler(0x5, "open", 3, self._handle_open)
syscall_handler.set_handler(0x6, "close", 1, self._handle_close)
syscall_handler.set_handler(0x92, "writev", 3, self._handle_writev)
syscall_handler.set_handler(0xC5, "fstat64", 2, self._handle_fstat64)
syscall_handler.set_handler(0x142, "openat", 4, self._handle_openat)
syscall_handler.set_handler(0x147, "fstatat64", 4, self._handle_fstatat64)
syscall_handler.set_handler(0x14e, "faccessat", 4, self._faccessat)
syscall_handler.set_handler(0x14d, "fchmodat", 4, self._fchmodat)
syscall_handler.set_handler(0x8c, "_llseek", 5, self._llseek)
syscall_handler.set_handler(0x13, "lseek", 3, self._lseek)


_handle_open 处理 open syscall


def _handle_open(self, mu, filename_ptr, flags, mode):
"""
int open(const char *pathname, int flags, mode_t mode);
return the new file descriptor, or -1 if an error occurred (in which case, errno is set appropriately).
"""

filename = memory_helpers.read_utf8(mu, filename_ptr)
return self._open_file(filename, flags, mode)


进一步分析_open_file


这个函数先对路径进行一些简单检验,判断是否为特殊文件,如果不是,则调用translate_path转换安全路径,并调用os.open打开本地文件,最后调用_store_fd转换文件句柄。


def _open_file(self, filename, flags, mode):
# Special cases, such as /dev/urandom.
orig_filename = filename

if filename == '/dev/urandom':
logger.info("File opened '%s'" % filename)
return self._store_fd('/dev/urandom', None, 'urandom')

file_path = self.translate_path(filename)

if os.path.isfile(file_path):
logger.info("File opened '%s'" % orig_filename)
flags = os.O_RDWR
if hasattr(os, "O_BINARY"):
flags = os.O_BINARY
return self._store_fd(orig_filename, file_path, os.open(file_path, flags=flags))
else:
logger.warning("File does not exist '%s'" % orig_filename)
return -1


读文件分析


def _handle_read(self, mu, fd, buf_addr, count):
"""
ssize_t read(int fd, void *buf, size_t count);
On files that support seeking, the read operation commences at the current file offset, and the file offset
is incremented by the number of bytes read. If the current file offset is at or past the end of file,
no bytes are read, and read() returns zero.
If count is zero, read() may detect the errors described below. In the absence of any errors, or if read()
does not check for errors, a read() with a count of 0 returns zero and has no other effects.
If count is greater than SSIZE_MAX, the result is unspecified.
"""

if fd <= 2:
raise NotImplementedError("Unsupported read operation for file descriptor %d." % fd)

if fd not in self._file_descriptors:
# TODO: Return valid error.
raise NotImplementedError()
file = self._file_descriptors[fd]
if file.descriptor == 'urandom':
if OVERRIDE_URANDOM:
buf = OVERRIDE_URANDOM_BYTE * count
else:
buf = os.urandom(count)
else:
buf = os.read(file.descriptor, count)
result = len(buf)
mu.mem_write(buf_addr, buf)
return result



Unicorn 调用SO之实现 JNI Functions


调用JNI是工程量最大的一部分,不仅需要实现JNI Functions,还要模拟JNI Env和Java类似的引用管理。


在前面的文章中已经学习了如何在虚拟机中调用主机的Python函数,接下来就学习如何实现JNI Functions里面的所有函数。



>>>>

模拟实现 Jni Function Table


可以参考 Jni Functions


Jni Function Table 是一个函数地址表,里面记录了Jni 函数地址。


(self.address_ptr, self.address) = hooker.write_function_table({
4: self.get_version,
5: self.define_class,
6: self.find_class,
7: self.from_reflected_method,
8: self.from_reflected_field,
9: self.to_reflected_method,
10: self.get_superclass,
11: self.is_assignable_from,
12: self.to_reflected_field,
13: self.throw,
14: self.throw_new,
15: self.exception_occurred,
16: self.exception_describe,
17: self.exception_clear,
18: self.fatal_error,
19: self.push_local_frame,
20: self.pop_local_frame,
21: self.new_global_ref,
22: self.delete_global_ref,
.......................
232: self.get_object_ref_type
})


write_function_table 函数实现了创建一个Function 地址表。


实现如下:


def write_function_table(self, table):
if not isinstance(table, dict):
raise ValueError("Expected a dictionary for the function table.")

index_max = int(max(table, key=int)) + 1

# First, we write every function and store its result address.
hook_map = dict()

for index, func in table.items():
hook_map[index] = self.write_function(func)

# Then we write the function table.
table_bytes = b""
table_address = self._hook_current

for index in range(0, index_max):
address = hook_map[index] if index in hook_map else 0
table_bytes += int(address + 1).to_bytes(4, byteorder='little') # + 1 because THUMB.

self._emu.mu.mem_write(table_address, table_bytes)
self._hook_current += len(table_bytes)

# Then we write the a pointer to the table.
ptr_address = self._hook_current
self._emu.mu.mem_write(ptr_address, table_address.to_bytes(4, byteorder='little'))
self._hook_current += 4

return ptr_address, table_address


Jni Functions Table 中有 200 多个函数,全部实现的话工作量十分巨大。默认实现如下:


@native_method
def call_boolean_method_a(self, mu, env):
raise NotImplementedError()


抛出异常可以保证当Native调用一个没有实现的JNI函数时能够及时发现,并实现它。


Classloader


AndroidNativeEmu 支持使用Python类来代替Jvm中Java类,这是如何实现的呢?


class android_content_context(metaclass=JavaClassDef, jvm_name="android/content/Context",
jvm_fields=[JavaFieldDef("CONNECTIVITY_SERVICE", "Ljava/lang/String;", True, "")]):

def __init__(self):
pass

@java_method_def(native=False, name="getPackageName", signature="()Ljava/lang/String;")
def getPackageName(self, *args, **kwargs):
return "com.test"

emulator.java_classloader.add_class(android_content_context) # load class


jvm_super 指定父类


jvm_name 定义对应的类名


jvm_fields 定义字段,是一个列表,每一项都是由JavaFieldDef定义的字段,JavaFieldDef(name, signature, is_static, static_value=None)

如果不是static字段,则还需要在init 中创建这个这个私有的成员变量。

metaclass=JavaClassDef 是Python的元类机制,参考廖雪峰老师文章
元类可真的是魔法!它可以动态修改类的定义,比如给类增加成员变量,增加方法等。


JavaClassDef 将class 定义指定的jvm_name jvm_fields保存到成员变量,并添加了find_method、find_method_by_id、find_field等函数,用于实现JNI。
 
注册方法


# Register all defined Java methods.
for func in inspect.getmembers(cls, predicate=inspect.isfunction):
if hasattr(func[1], 'jvm_method'):
method
= func[1].jvm_method
method.jvm_id = next(JavaClassDef.next_jvm_method_id)
cls.jvm_methods[method.jvm_id] = method


注册字段


# Register all defined Java fields.
if jvm_fields is not None:
for jvm_field in jvm_fields:
jvm_field.jvm_id = next(JavaClassDef.next_jvm_field_id)
cls.jvm_fields[jvm_field.jvm_id] = jvm_field


查找字段的函数


支持类继承,Java是单继承,查找的时候先从基类开始递归查找。


def find_field_by_id(cls, jvm_id):
try:
if cls.jvm_super is not None:
find = cls.jvm_super.find_field_by_id(jvm_id)
if find is not None:
return find
except KeyError:
pass

return cls.jvm_fields[jvm_id]


metaclass 修饰了定义的java类,隐藏了类解析背后的细节,使得添加一个java类很方便。java类定义后,还需要添加模拟器的class管理。


def add_class(self, clazz):
if not isinstance(clazz, JavaClassDef):
raise ValueError('Expected a JavaClassDef.')

if clazz.jvm_name in self.class_by_name:
raise KeyError('The class \'%s\' is already registered.' % clazz.jvm_name)

self.class_by_id[clazz.jvm_id] = clazz
self.class_by_name[clazz.jvm_name] = clazz


保存的是类的定义而不是实例。



>>>>

引用管理


目前实现两种类型的引用,一种jobject和jclass。jobejct是用来引用实例对象,jclass用来引用类。


Python 实现java的类,如果返回一个String,那么就会自动创建一个String引用,然后把引用id返回给Native 函数。Native 函数再调用GetStringUtfChars获取引用的字符串。 GetStringUtfChars 的实现如下。


@native_method
def get_string_utf_chars(self, mu, env, string, is_copy_ptr):
logger.debug("JNIEnv->GetStringUtfChars(%u, %x) was called" % (string, is_copy_ptr))

if is_copy_ptr != 0:
raise NotImplementedError()

str_ref = self.get_reference(string) #通过引用ID获取引用对象
str_val = str_ref.value # 获取该引用的值
str_ptr = self._emu.native_memory.allocate(len(str_val) + 1)

logger.debug("=> %s" % str_val)

memory_helpers.write_utf8(mu, str_ptr, str_val)
return str_ptr


引用还分为局部引用和全局引用。局部引用的生命周期是进入native 函数到native 函数返回。 全局引用则作用于模拟器整个生命周期。


self._locals = ReferenceTable(start=1, max_entries=2048)
self._globals = ReferenceTable(start=4096, max_entries=512000)



>>>>

实现 GetEnv


class JavaVM:
"""
:type class_loader JavaClassLoader
:type hooker Hooker
"
""
def __init__(self, emu, class_loader, hooker):
(self.address_ptr, self.address) = hooker.write_function_table({
3: self.destroy_java_vm,
4: self.attach_current_thread,
5: self.detach_current_thread,
6: self.get_env,
7: self.attach_current_thread
})

self.jni_env = JNIEnv(emu, class_loader, hooker)

@native_method
def get_env(self, mu, java_vm, env, version):
logger.debug("java_vm: 0x%08x" % java_vm)
logger.debug("env: 0x%08x" % env)
logger.debug("version: 0x%08x" % version)

mu.mem_write(env, self.jni_env.address_ptr.to_bytes(4, byteorder='little'))

logger.debug("JavaVM->GetENV() was called!")

return JNI_OK



>>>>

Native / Java 函数调用


我们目前已经实现了Java 调用native 的过程,native再回调java是否可以实现呢?实际上,在class定义的时候就解析了所有带@java_method_def 装饰器的成员函数。native通过JNI调用java函数,我们可以模拟对应的JNI函数,使其在所有加载的类中查找是否有对应签名的函数,如果有则直接调用。
 
@java_method_def 装饰器不仅描述了函数的签名,还转换函数的参数数据和返回值信息。
 
这个修饰器的定义如下:


def java_method_def(name, signature, native=False, args_list=None, modifier=None, ignore=False):
def java_method_def_real(func):
def native_wrapper(self, emulator, *argv): # native 方法,进入虚拟机
return emulator.call_native(
native_wrapper.jvm_method.native_addr,
emulator.java_vm.jni_env.address_ptr, # JNIEnv*
self, # this
# method has been declared in
*argv # Extra args.
)

def normal_wrapper(*args, **kwargs): # 普通方法,直接调用
result = func(*args, **kwargs)
return result

wrapper = native_wrapper if native else normal_wrapper # 判断是否没native函数,分别进入不同的wrapper
wrapper.jvm_method = JavaMethodDef(func.__name__, wrapper, name, signature, native,
args_list=args_list,
modifier=modifier,
ignore=ignore)
return wrapper

return java_method_def_real



>>>>

参数转换


Java 调用 Native 的时候要把非数字类型转换为对象引用。


Native 调用 Java 或者 返回的时候要把引用转换成对象,转换方法实现如下:


def native_write_args(emu, *argv):
amount = len(argv)

if amount == 0:
return

if amount >= 1:
native_write_arg_register(emu, UC_ARM_REG_R0, argv[0])

if amount >= 2:
native_write_arg_register(emu, UC_ARM_REG_R1, argv[1])

if amount >= 3:
native_write_arg_register(emu, UC_ARM_REG_R2, argv[2])

if amount >= 4:
native_write_arg_register(emu, UC_ARM_REG_R3, argv[3])

if amount >= 5:
sp_start = emu.mu.reg_read(UC_ARM_REG_SP)
sp_current = sp_start - STACK_OFFSET # Need to offset because our hook pushes one register on the stack.

for arg in argv[4:]:
emu.mu.mem_write(sp_current - STACK_OFFSET, native_translate_arg(emu, arg).to_bytes(4, byteorder='little'))
sp_current = sp_current - 4

emu.mu.reg_write(UC_ARM_REG_SP, sp_current)


def native_translate_arg(emu, val):
if isinstance(val, int):
return val
elif isinstance(val, str):
return emu.java_vm.jni_env.add_local_reference(jstring(val))
elif isinstance(val, list):
return emu.java_vm.jni_env.add_local_reference(jobjectArray(val))
elif isinstance(val, bytearray):
return emu.java_vm.jni_env.add_local_reference(jbyteArray(val))
elif isinstance(type(val), JavaClassDef):
# TODO: Look into this, seems wrong..
return emu.java_vm.jni_env.add_local_reference(jobject(val))
elif isinstance(val, JavaClassDef):
return emu.java_vm.jni_env.add_local_reference(jobject(val))
else:
raise NotImplementedError("Unable to write response '%s' type '%s' to emulator." % (str(val), type(val)))


def native_write_arg_register(emu, reg, val):
emu.mu.reg_write(reg, native_translate_arg(emu, val))

## 返回值转换
result_idx = self.mu.reg_read(UC_ARM_REG_R0)
result = self.java_vm.jni_env.get_local_reference(result_idx)



附录


Unicorn 优秀项目:http://www.unicorn-engine.org/showcase/





- End -





看雪ID无名侠

https://bbs.pediy.com/user-617255.htm


*本文由看雪论坛 无名侠原创,转载请注明来自看雪社区






推荐文章++++

Unicorn 在 Android 的应用之Hello World

App免Root加载Xposed插件工具Xpatch源码解析(二)

彩蛋解密之物理内存读写到****的转变

Win10_64 默认应用的UserChoice Hash算法学习





进阶安全圈,不得不读的一本书










“阅读原文”一起来充电吧!

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存