..

IMF: Inferred Model-based Fuzzer

Introduction

IMF 是SoftSec-KAIST 2017 年发表在 CCS 上的研究成果,同 Winnie 类似的也是基于动态 trace 自动生成 harness 的工具,不过 IMF 的目标是闭源操作系统 API 模型,实现上以 MacOS 为例。

Methodology

arch

Logger

  1. 首先,我们需要考虑我们应该从记录器记录多少数据。我们仅存储到一个级别的间接指针(按道理存储更多级别的指针更有助于恢复依赖)当调用挂钩的API调用时,日志记录器检查参数的类型,并决定记录哪些值。
    • 数值指针存储指针地址和指向的数值
    • 字符串指针存储指针地址和指向的字符串
  2. 其次,我们应该为GUI应用程序生成输入,以便记录API调用。我们使用 PyUserInput [3] 为正在运行的每个程序构造鼠标和键盘事件。

    这一点工具中并未体现,但也不是本文工作重心

Inferrer

  1. Log Filtering
    1. inferrer 的过滤功能从给定的整个日志集中选择N(N=2)个具有最长公共前缀的日志。
    2. 然后,它收集每个选定日志的前缀部分,以便构造一组具有完全相同顺序和相同数量的 API 调用的调用序列 S。
  2. API Model Inference
    1. 首先,S 中的调用序列已经显示了API 函数调用之间的顺序依赖性。我们可以通过考虑API 调用的顺序应遵循与 S 中的序列之一完全相同的顺序来过度近似顺序依赖关系。
    2. 其次,我们需要计算数值的依赖性。
      1. 首先识别常数参数:在不同的序列中都表现出同样数值
      2. 接下来,我们考虑从输出参数到输入参数的近似数据流:对于某个系统调用的给定参数数值,我们检查是否有任何具有相同输出值的先验函数调用。当输入参数依赖于多个输出参数时,我们采用最近调用的函数中使用的参数。

        上述的方法可能会导致错误的数值依赖,Inferrer 采用两个技术缓解:

        1. 它排除数据流到一个恒定的输入参数
        2. 我们得出N个调用序列中每个序列的值依赖性集,并计算它们的交集。
      3. 最后,对于那些没有任何依赖关系的非恒定输入变量,我们简单地使用日志中出现的相同具体值。由于有N个可能的值可以使用,因此我们随机取其中一个值。

Fuzzer

  • Mutation Strategy
    • Sequence Replication:迭代更多次系统调用序列
    • ParameterMutation:IMF执行基于类型的参数突变。它用对突变函数的调用替换AST中的每个参数。
  • Data Collection

    IMF 在发现错误时可能会触发内核恐慌或系统挂起。因此,它必须在执行程序之前将PRNG 种子的当前值保存到永久存储中。(琐碎的细节)

Implementation

gen-hook (src/hook.py)

对应用的 API 调用进行跟踪,使用的是 MacOS 特有的基于 DYLD_INSERT_LIBRARIES 来 Hook 调用的方式(笔者对 MacOS 不熟悉)。好像是在 __DATA 段加上一个表 __interpose 包含原始的函数名和新的函数名,编译生成动态链接库,并在运行程序前使用 DYLD_INSERT_LIBRARIES 指定,就可以拦截对函数的调用。

IMF 依赖于 src/const.py 中手动对所有需要跟踪的 API 编写描述,包括返回类型、函数名、参数名及类型、参数用作输入输出的信息等。hook.py 中会根据 API Defs 来生成一个替换函数,会在调用原函数前记录所调用的函数名,以及所有输入参数的具体数值,并在对原始函数的调用之后记录返回值和所有输出参数的具体数值。

# API Defs
[('kern_return_t', 'IOConnectSetCFProperty'), [('io_connect_t', 'connect', {}), ('CFStringRef', 'propertyName', {}), ('CFTypeRef', 'property', {})]],
[('kern_return_t', 'IOConnectCallMethod'), [('mach_port_t', 'connection', {}), ('uint32_t', 'selector', {}), ('const uint64_t *', 'input', {'cnt': 'inputCnt', 'IO': 'I', 'size': 8}), ('uint32_t', 'inputCnt', {}), ('const void *', 'inputStruct', {'cnt': 'inputStructCnt', 'IO': 'I', 'size': 1}), ('size_t', 'inputStructCnt', {}), ('uint64_t *', 'output', {'cnt': '*outputCnt', 'IO': 'O', 'size': 8}), ('uint32_t *', 'outputCnt', {'cnt': 1, 'IO': 'IO', 'size': 4,}), ('void *', 'outputStruct', {'cnt': '*outputStructCnt', 'IO': 'O', 'size': 1}), ('size_t *', 'outputStructCnt', {'cnt': 1, 'IO': 'IO', 'size': 4,})]],

# fake API wrapper
kern_return_t fake_IOConnectSetCFProperty(io_connect_t connect,CFStringRef propertyName,CFTypeRef property){
	FILE *fp = fopen(log_path,"a");
	flock(fileno(fp),LOCK_EX);
	fprintf(fp,"IN ['IOConnectSetCFProperty',");
	if(1) fprintf(fp,"{'name':'connect','value': 0x%x,'size' : 0x%lx,'cnt':0x%x, 'data':[",connect, sizeof(io_connect_t),1);
 	else fprintf(fp,"{'name':'connect','value': 0x%x, 'size' : 0x%lx,'cnt':'undefined', 'data':[",connect,sizeof(io_connect_t));
	fprintf(fp,"]},");
 	if(1) fprintf(fp,"{'name':'propertyName','value': 'CFSTR(\"%s\")','size' : 0x%lx,'cnt':0x%x, 'data':[",CFStringGetCStringPtr(propertyName,kCFStringEncodingMacRoman), sizeof(CFStringRef),1);
 	else fprintf(fp,"{'name':'propertyName','value': 'CFSTR(\"%s\")', 'size' : 0x%lx,'cnt':'undefined', 'data':[",CFStringGetCStringPtr(propertyName,kCFStringEncodingMacRoman),sizeof(CFStringRef));
	fprintf(fp,"]},");
 	if(1) fprintf(fp,"{'name':'property','value': 0x%x,'size' : 0x%lx,'cnt':0x%x, 'data':[",property, sizeof(CFTypeRef),1);
 	else fprintf(fp,"{'name':'property','value': 0x%x, 'size' : 0x%lx,'cnt':'undefined', 'data':[",property,sizeof(CFTypeRef));
	fprintf(fp,"],'ori':");
	log_CFTypeRef(fp,property);
	fprintf(fp,"},");
	fprintf(fp,"]\n");
	kern_return_t ret = IOConnectSetCFProperty(connect,propertyName,property);
	fprintf(fp,"OUT ['IOConnectSetCFProperty',");
	if(1) fprintf(fp,"{'name':'ret','value': 0x%x,'size' : 0x%lx,'cnt':0x%x, 'data':[",ret, sizeof(kern_return_t),1);
 	else fprintf(fp,"{'name':'ret','value': 0x%x, 'size' : 0x%lx,'cnt':'undefined', 'data':[",ret,sizeof(kern_return_t));
	fprintf(fp,"]},");
 	fprintf(fp,"]\n");
	fclose(fp);
	return ret;
}

# API Log
IN ['IOBSDNameMatching',{'name':'masterPort','value': 0x0, 'size' : 0x4,'cnt':0x1, 'data':[]},{'name':'options','value': 0x0, 'size' : 0x4,'cnt':0x1, 'data':[]},{'name':'bsdName','value': '"disk1"', 'size' : 0x4,'cnt':0x1, 'data':[]},]
OUT ['IOBSDNameMatching',{'name':'ret','value': 0x797c6610, 'size' : 0x4,'cnt':0x1, 'data':[]},]

filter-log (src/filter.py)

这一步主要的工作是将所有的 log 文件根据最长公共前缀筛选至 N 个(sample 中 N=2)只包含公共前缀的 log 文件。

核心逻辑就在 find_best 函数里,通过一个 while 循环挨个对 API log 进行处理。groups 是个二维数组,第一维的个数代表当前迭代产生了多少个前缀,第二维代表每个前缀被哪些 log 文件所共有。

  • categorize 用于在增加一个 API,即考虑的前缀长度加一之后对 groups 数组进行更新。算法核心是判断每个 group 内部每个 log 新增的 API 是否相同(get 获得基于 API name 计算的 hash 值),将不同的再拆分成为不同的 group。
  • pick_best 判断是否完成 filter 的逻辑是:当某次迭代之后,最后一个被超过 N 个 log 文件所共有的前缀也被拆成了几个都不超过 N 的集合,那这个前缀就是被”最多” N 个 log 文件所共有的最长公共前缀,则在所有拥有这个前缀的 log 文件中选 N 个文件作为结果。
def categorize(groups, idx):
    ret = []
    for group in groups:
        tmp = {}
        for fn, hvals in group:
            hval = get(hvals, idx)
            if hval not in tmp:
                tmp[hval] = []
            tmp[hval].append((fn, hvals))
        for hval in tmp:
            if hval != None :
                ret.append(tmp[hval])
    return ret

def pick_best(groups, n):
    for group in groups:
        if len(group) >= n:
            return group[:n]
    return None

def find_best(groups, n):
    before = None
    idx = 0
    while len(groups) != 0:
        before = groups
        groups = categorize(groups, idx)
        if pick_best(groups, n) == None:
            return pick_best(before, n), idx
        idx += 1
    utils.error('find_best error')

gen-fuzz (src/model.py)

genfuzz 分为两步,首先 Model 的初始化会对 filtered-log 进行解析生成 AST 并进行数据依赖分析,然后调用 fuzz() 将 AST 输出到代码之中。

def make_model(self, fnames, limit, path, core):
        apisets = utils.multiproc(self.load_apilog_multi(limit), fnames, core)
        model = Model(apisets)
        with open(path, 'wb') as f:
            code = model.fuzz(const.CODE_HEAD, const.CODE_TAIL)
            f.write(code)

Model 中的数据依赖分析主要就包含两个,常量分析 check_const 和数据流分析。

class Model:
    def __init__(self, apisets):
        self.mapis = []
        for idx in range(len(apisets[0])):
            apilog = apisets[0][idx]
            self.mapis.append(Mapi(apilog, idx))
        self.check_const(apisets)
        self.add_dataflow(apisets)
  • check_const 的逻辑很简单,就是看在不同 log 之中,对于同一个 API 调用的某个参数的数值是否发生变化。
  • add_dataflow 也不复杂

    before 数组存储了所有已经遍历过的 API 的返回数值以及输出数据信息,每次遍历一个 API 就会调用 update_before 更新 before 数组。

      def update_before(before, apilog, mapi, n):
          # 获得输出数据指针参数
          dic = apilog.get('ol')
          # 获得返回数值
          if not apilog.is_void():
              dic[const.RVAL] = apilog.get('rval_log')
          for name, arglog in dic.iteritems():
              value = arglog.get_log('value')
              key = value
              add_before(before, key, mapi, n, (name,VALUE,0))
        
              if arglog.is_ptr() and value !=0:
                  # 如果数组指针,则把数组中的每一个数组都更新到 before 中
                  data = arglog.get_log('data')
                  for idx in range(len(data)):
                      add_before(before, data[idx], mapi, n, (name,DATA,idx))
        
      #key = apilog,n ; pos = arg,DATA,n
      def add_before(before, value, mapi, n, pos):
          key = mapi,n
          if not value in before:
              before[value] = {}
          if not key in before[value]:
              before[value][key] = {}
          before[value][key][pos] = True
        
      Class Model:
          def add_dataflow(self, apisets):
              for apiset in apisets:
                  before = {}
                  for idx in range(len(apiset)):
                      apilog = apiset[idx]
                      mapi =self.mapis[idx]
                      mapi.add_dataflow(before, apilog)
                      update_before(before, apilog, mapi, idx)
    

    add_dataflow 最终会进到 get_df 或 get_inter_df 中。如果是对第一个 log 进行分析,则进入 get_df,按照 value 在 before 数组中查找所有的依赖关系(因为之后会对不同 log 文件的依赖取交集所以这里需要所有的关系)。如果是在已经分析过一次依赖关系之后,就会进入到 get_inter_df,会对两个 dataflow 取交集。

      def get_df(before, value):
          ret = {}
          if value in before:
              # 如果这个数值在之前的返回数据中出现过,则把所有的位置都作为依赖返回
              df = before[value]
              for key1 in df:
                  ret[key1] = {}
                  for key2 in df[key1]:
                      ret[key1][key2] = df[key1][key2]
          return ret
        
      def get_inter_df(before, value, df):
          ret = {}
          if value in before:
              for key1 in df:
                  if key1 in before[value]:
                      # 必须得是 df(dataflow_other) 里出现过,取交集
                      for key2 in df[key1]:
                          if key2 in before[value][key1]:
                              if not key1 in ret :
                                  ret[key1] = {}
                              ret[key1][key2] = True
          return ret
        
      Class Mapi:
          def add_dataflow(self, before, apilog)
              ilog = apilog.get('il')
              # self.il 代表输入参数的 log
              for name in self.il:
                  self.il[name].add_dataflow(before, ilog[name])
        
      Class Marg:
          def add_dataflow(self, before, arglog):
              # 如果是数组,则对于每个数组的数值进行一次分析
              if self.is_array():
                  cnt = arglog.get_log('cnt')
                  assert(cnt == len(self.data))
                  values = arglog.get_log('data')
                  for i in range(cnt):
                      self.data[i].add_dataflow(before, values[i])
              value = arglog.get_log('value')
              self.value.add_dataflow(before, value)
        
      class Mval:
          def add_dataflow(self, before, value):
              self.raw.append(value)
              if self.is_const() == False:
                  # 不对常数做数据流分析
                  if self.dataflow == None:
                      self.dataflow = get_df(before,value)
                  else: # 说明不是第一个 log 文件,两个 dataflow 取交集
                      self.dataflow = get_inter_df(before,value,self.dataflow)
    

每个 API 都有一个自己的序号,最后生成的 fuzz 的时候在依赖关系中找到 API idx 和对应的变量名拼起来就可以引用了。

mach_port_t masterPort_0 = (mach_port_t) mut_int(0);
uint32_t options_0 = (uint32_t) mut_int(0);
char * bsdName_0 = (char *) mut_ptr("disk1");
CFMutableDictionaryRef ret_0 = IOBSDNameMatching(masterPort_0, options_0, bsdName_0);
mach_port_t masterPort_1 = (mach_port_t) mut_int(0);
CFDictionaryRef matching_1 = (CFDictionaryRef) mut_ptr(ret_0);

Conclusion

毕竟是比较久远的文章,从方法上来讲比较简略。

  • 最长公共前缀的方法感觉比较脆弱,没有强有力的证据说明 API 模型不会间插使用。
  • 数据依赖的分析基于直接的数值分析也很不精准。
  • API 描述依赖于昂贵的人工成本,可扩展性不强。
  • 对参数的变异也没有考虑嵌套的结构化信息,对 API 顺序方面不提供变异策略。