Ocean Lotus Group,也被称之为APT32,这个黑客组织此前主要的攻击目标以越南、老挝和菲律宾等东亚国家为主,虽然私营企业是该组织的主要目标,但外国政府、政治活动家和新闻记者也是他们的攻击目标之一。
APT32的攻击工具非常多样化,从Mimikatz和Cobalt Strike这样的高级定制工具,到ShellCode以及后门等等,应有尽有。而且他们所使用的很多代码都经过了高度模糊处理或混淆处理,并使用了不同的技术来提升检测和分析的难度,导致研究人员更加难以对它们进行逆向分析。
在这篇文章中,我们将介绍该组织所使用的其中一种代码混淆技术,而这种技术也被APT32广泛应用到了他们的后门代码中。反混淆处理的过程中需要使用到Cutter以及官方开源逆向工程框架-radare2,还请各位同学自行搜索下载。
下载和安装Cutter
Cutter目前支持Linux、macOS和Windows。
Cutter下载地址:【点我下载】
Cutter基础教程:【点我获取】
后门分析
我们的样本(486be6b1ec73d98fdd3999abe2fa04368933a2ec)是多级感染链中的一部分,而且APT32在多个活动中都使用到了这个后门,例如恶意文件样本(115f3cb5bdfb2ffe5168ecb36b9aed54)。这个文档声称自己来自于360,但是其中包含了一个恶意VBA宏,这个恶意宏会向rundll32.exe注入恶意Shellcode。Shellcode中包含了解密代码,可以直接对恶意代码进行解密并将相应的DLL加载进内存,而DLL包含的就是后门逻辑。
首先,后门会解密一个配置文件,其中存储的信息包含C2服务器基础信息在内。接下来,代码会尝试使用自定义PE加载器向内存中加载恶意DLL。这个DLL会被HTTPProv.dll调用,并能够与C2服务器通信。后门还可以从C2服务器接收十几种不同的指令,包括Shellcode执行、新进程创建以及文件和目录修改等操作。
该组织所使用的很多混淆技术其目的就是要增加逆向分析的难度,而且其二进制代码中使用了大量的垃圾代码,这些垃圾代码会增加样本的体积和复杂性,以分散研究人员的注意力。而且,其中的代码集经常会与堆栈指针一起使用,而普通的反编译工具无法对这种情况进行有效处理。
混淆技术
APT32在进行代码混淆处理时,大量使用了控制流混淆,并且向函数流中注入了大量垃圾代码块。这些垃圾代码块不会实现任何功能,只是为了混淆视听而已。
大家可以从上图中看到,其中包含了大量垃圾代码块。仔细分析后我们会发现,所有需要跳转到这些代码段的条件判断结果都为False,而且都是以条件跳转结束的,跟之前的条件判断正好相反。比如说,垃圾代码段之前的条件判断为jo <some_addr>,那么垃圾代码很有可能以jno<some_addr>结束。如果之前的代码段以jne <another_addr>结束,那么垃圾代码段就会以je <another_addr>结束。
这样一来,我们就可以对这些垃圾代码段定性了。第一种特性:出现两个连续的垃圾代码块,以相反的条件跳转到相同的目标地址并结束。第二种特性:要求第二个块不包含有意义的指令,如字符串引用或代码调用等等。
当满足这两个特征时,我们可以说第二个块很可能是垃圾代码块。这样,我们就可以将垃圾块从图表中删除了,并使用无条件跳转来修补源代码。
编写核心类
首先,我们要创建一个Python类作为我们的核心类,这个类需要包含查找和移除垃圾代码块的逻辑。先定义__init__函数,该函数可以接收管道消息,可以是来自redare2的r2pipe对象(importr2pipe),也可以是来自Cutter的cutter对象(import cutter)。
class GraphDeobfuscator:
def __init__(self, pipe):
"""an initializationfunction for the class
Arguments:
pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper
"""
self.pipe = pipe
现在我们就可以使用这个管道来执行radare2命令了。这个管道对象包含两种执行r2命令的方式。第一种为pipe.cmd(<command>),它能够以字符串的形式返回命令执行结果。第二种为pipe.cmdj(<command>j),它你能够根据radare2命令的输出结果返回解析后的JSON对象。
接下来就是从当前函数中获取所有的代码块,然后进行迭代。这里可以使用afbj米工龄来获取函数中所有代码块的JSON对象。
def clean_junk_blocks(self):
"""Search a givenfunction for junk blocks, remove them and fix the flow.
"""
# Get all the basic blocks of thefunction
blocks = self.pipe.cmdj("afbj @$F")
if not blocks:
print("[X] No blocks found. Is it afunction?")
return
modified = False
# Iterate over all the basic blocks ofthe function
for block in blocks:
# do something
针对每一个块,根据之前的判断条件进行分析,获取候选垃圾代码块:
def get_fail_block(self, block):
"""Return the block towhich a block branches if the condition is fails
Arguments:
block {block_context} -- A JSONrepresentation of a block
Returns:
block_context -- The block to whichthe branch fails. If not exists, returns None
"""
# Get the address of the"fail" branch
fail_addr = self.get_fail(block)
if not fail_addr:
return None
# Get a block context of the failaddress
fail_block = self.get_block(fail_addr)
return fail_block if fail_block elseNone
def is_successive_fail(self, block_A,block_B):
"""Check if the endaddress of block_A is the start of block_B
Arguments:
block_A {block_context} -- A JSONobject to represent the first block
block_B {block_context} -- A JSONobject to represent the second block
Returns:
bool -- True if block_B comes immediatelyafter block_A, False otherwise
"""
return ((block_A["addr"] +block_A["size"]) == block_B["addr"])
接下来,我们要判断候选垃圾代码段是否包含无效指令:
def contains_meaningful_instructions (self,block):
'''Check if a block contains meaningfulinstructions (references, calls, strings,...)
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
bool -- True if the block containsmeaningful instructions, False otherwise
'''
# Get summary of block - strings, calls,references
summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))
return summary != ""
最后,枚举出所有对立的跳转条件:
jmp_pairs = [
['jno', 'jo'],
['jnp', 'jp'],
['jb', 'jnb'],
['jl', 'jnl'],
['je', 'jne'],
['jns', 'js'],
['jnz', 'jz'],
['jc', 'jnc'],
['ja', 'jbe'],
['jae', 'jb'],
['je', 'jnz'],
['jg', 'jle'],
['jge', 'jl'],
['jpe', 'jpo'],
['jne', 'jz']]
def is_opposite_conditional(self, cond_A,cond_B):
"""Check if two operandsare opposite conditional jump operands
Arguments:
cond_A {string} -- the conditionaljump operand of the first block
cond_B {string} -- the conditionaljump operand of the second block
Returns:
bool -- True if the operands areopposite, False otherwise
"""
sorted_pair = sorted([cond_A, cond_B])
for pair in self.jmp_pairs:
if sorted_pair == pair:
return True
return False
将上述所有代码整合到clean_junk_blocks()函数中:
def clean_junk_blocks(self):
"""Search a givenfunction for junk blocks, remove them and fix the flow.
"""
# Get all the basic blocks of thefunction
blocks = self.pipe.cmdj("afbj @$F")
if not blocks:
print("[X] No blocks found. Isit a function?")
return
modified = False
# Iterate over all the basic blocks ofthe function
for block in blocks:
fail_block =self.get_fail_block(block)
if not fail_block or \
not self.is_successive_fail(block,fail_block) or \
self.contains_meaningful_instructions(fail_block) or \
notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):
continue
使用Radare2
if__name__ == "__main__":
graph_deobfuscator = GraphDeobfuscator(pipe)
graph_deobfuscator.clean_graph()
使用Cutter
ifcutter_available:
# This part will be executed only if Cutteris available.
# This will create the cutter plugin and UIobjects for the plugin
classGraphDeobfuscatorCutter(cutter.CutterPlugin):
name = "APT32 GraphDeobfuscator"
description = "Graph Deobfuscatorfor APT32 Samples"
version = "1.0"
author = "Itay Cohen(@Megabeets_)"
def setupPlugin(self):
pass
def setupInterface(self, main):
pass
def create_cutter_plugin():
return GraphDeobfuscatorCutter()
为了保证插件正常运行,我们还需要增加一个菜单入口来触发反混淆功能:
ifcutter_available:
# This part will be executed only if Cutteris available. This will
# create the cutter plugin and UI objectsfor the plugin
classGraphDeobfuscatorCutter(cutter.CutterPlugin):
name = "APT32 GraphDeobfuscator"
description = "Graph Deobfuscatorfor APT32 Samples"
version = "1.0"
author = "Megabeets"
def setupPlugin(self):
pass
def setupInterface(self, main):
# Create a new action (menu item)
action = QAction("APT32 GraphDeobfuscator", main)
action.setCheckable(False)
# Connect the action to a function - cleaner.
# A click on this action willtrigger the function
action.triggered.connect(self.cleaner)
# Add the action to the"Windows -> Plugins" menu
pluginsMenu =main.getMenuByType(main.MenuType.Plugins)
pluginsMenu.addAction(action)
def cleaner(self):
graph_deobfuscator =GraphDeobfuscator(pipe)
graph_deobfuscator.clean_graph()
cutter.refresh()
def create_cutter_plugin():
return GraphDeobfuscatorCutter()
接下来,我们就可以看到图形化的分析结果了:
移除垃圾代码段之后的结果图如下所示:
对比图如下:
样本SHA256值
Be6d5973452248cb18949711645990b6a56e7442dc30cc48a607a2afe7d8ec66
8d74d544396b57e6faa4f8fdf96a1a5e30b196d56c15f7cf05767a406708a6b2
APT32图形化反混淆工具-完整源代码
"""A plugin for Cutter and Radare2 to deobfuscate APT32 flow graphs
Thisis a python plugin for Cutter that is compatible as an r2pipe script for
radare2as well. The plugin will help reverse engineers to deobfuscate and remove
junkblocks from APT32 (Ocean Lotus) samples.
"""
__author__ = "Itay Cohen, aka @megabeets_"
__company__= "Check Point Software Technologies Ltd"
#Check if we're running from cutter
try:
import cutter
from PySide2.QtWidgets import QAction
pipe = cutter
cutter_available = True
# Ifno, assume running from radare2
except:
import r2pipe
pipe = r2pipe.open()
cutter_available = False
classGraphDeobfuscator:
# A list of pairs of opposite conditionaljumps
jmp_pairs = [
['jno', 'jo'],
['jnp', 'jp'],
['jb', 'jnb'],
['jl', 'jnl'],
['je', 'jne'],
['jns', 'js'],
['jnz', 'jz'],
['jc', 'jnc'],
['ja', 'jbe'],
['jae', 'jb'],
['je', 'jnz'],
['jg', 'jle'],
['jge', 'jl'],
['jpe', 'jpo'],
['jne', 'jz']]
def __init__(self, pipe, verbose=False):
"""an initializationfunction for the class
Arguments:
pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper
Keyword Arguments:
verbose {bool} -- if True willprint logs to the screen (default: {False})
"""
self.pipe = pipe
self.verbose = verbose
def is_successive_fail(self, block_A,block_B):
"""Check if the endaddress of block_A is the start of block_B
Arguments:
block_A {block_context} -- A JSONobject to represent the first block
block_B {block_context} -- A JSONobject to represent the second block
Returns:
bool -- True if block_B comesimmediately after block_A, False otherwise
"""
return ((block_A["addr"] +block_A["size"]) == block_B["addr"])
def is_opposite_conditional(self, cond_A,cond_B):
"""Check if two operandsare opposite conditional jump operands
Arguments:
cond_A {string} -- the conditionaljump operand of the first block
cond_B {string} -- the conditionaljump operand of the second block
Returns:
bool -- True if the operands areopposite, False otherwise
"""
sorted_pair = sorted([cond_A, cond_B])
for pair in self.jmp_pairs:
if sorted_pair == pair:
return True
return False
defcontains_meaningful_instructions (self, block):
'''Check if a block contains meaningfulinstructions (references, calls, strings,...)
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
bool -- True if the block containsmeaningful instructions, False otherwise
'''
# Get summary of block - strings,calls, references
summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"]))
return summary != ""
def get_block_end(self, block):
"""Get the address ofthe last instruction in a given block
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
The address of the last instructionin the block
"""
# save current seek
self.pipe.cmd("s{addr}".format(addr=block['addr']))
# This will return the address of ablock's last instruction
block_end = self.pipe.cmd("?v $@B:-1")
return block_end
def get_last_mnem_of_block(self, block):
"""Get the mnemonic ofthe last instruction in a block
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
string -- the mnemonic of the lastinstruction in the given block
"""
inst_info = self.pipe.cmdj("aoj @{addr}".format(addr=self.get_block_end(block)))[0]
return inst_info["mnemonic"]
def get_jump(self, block):
"""Get the address towhich a block jumps
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
addr -- the address to which theblock jumps to. If such address doesn't exist, returns False
"""
return block["jump"] if"jump" in block else None
def get_fail_addr(self, block):
"""Get the address towhich a block fails
Arguments:
block {block_context} -- A JSONobject which represents a block
Returns:
addr -- the address to which theblock fail-branches to. If such address doesn't exist, returns False
"""
return block["fail"] if"fail" in block else None
def get_block(self, addr):
"""Get the block contextin a given address
Arguments:
addr {addr} -- An address in ablock
Returns:
block_context -- the block to whichthe address belongs
"""
block = self.pipe.cmdj("abj. @{offset}".format(offset=addr))
return block[0] if block else None
def get_fail_block(self, block):
"""Return the block towhich a block branches if the condition is fails
Arguments:
block {block_context} -- A JSONrepresentation of a block
Returns:
block_context -- The block to whichthe branch fails. If not exists, returns None
"""
# Get the address of the"fail" branch
fail_addr = self.get_fail_addr(block)
if not fail_addr:
return None
# Get a block context of the failaddress
fail_block = self.get_block(fail_addr)
return fail_block if fail_block elseNone
def reanalize_function(self):
"""Re-Analyze a functionat a given address
Arguments:
addr {addr} -- an address of afunction to be re-analyze
"""
# Seek to the function's start
self.pipe.cmd("s $F")
# Undefine the function in this address
self.pipe.cmd("af- $")
# Define and analyze a function in thisaddress
self.pipe.cmd("afr @ $")
def overwrite_instruction(self, addr):
"""Overwrite aconditional jump to an address, with a JMP to it
Arguments:
addr {addr} -- address of aninstruction to be overwritten
"""
jump_destination =self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0])
if (jump_destination):
self.pipe.cmd("wai jmp0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr))
def get_current_function(self):
"""Return the startaddress of the current function
Return Value:
The address of the currentfunction. None if no function found.
"""
function_start =int(self.pipe.cmd("?vi $FB"))
return function_start if function_start!= 0 else None
def clean_junk_blocks(self):
"""Search a givenfunction for junk blocks, remove them and fix the flow.
"""
# Get all the basic blocks of thefunction
blocks = self.pipe.cmdj("afbj @$F")
if not blocks:
print("[X] No blocks found. Isit a function?")
return
# Have we modified any instruction inthe function?
# If so, a reanalyze of the function isrequired
modified = False
# Iterate over all the basic blocks ofthe function
for block in blocks:
fail_block =self.get_fail_block(block)
# Make validation checks
if not fail_block or \
not self.is_successive_fail(block,fail_block) or \
self.contains_meaningful_instructions(fail_block) or \
notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)):
continue
if self.verbose:
print ("Potential junk:0x{junk_block:x}(0x{fix_block:x})".format(junk_block=fail_block["addr"],fix_block=block["addr"]))
self.overwrite_instruction(self.get_block_end(block))
modified = True
if modified:
self.reanalize_function()
def clean_graph(self):
"""the initial functionof the class. Responsible to enable cache and start the cleaning
"""
# Enable cache writing mode. changeswill only take place in the session and
# will not override the binary
self.pipe.cmd("eio.cache=true")
self.clean_junk_blocks()
ifcutter_available:
# This part will be executed only if Cutteris available. This will
# create the cutter plugin and UI objectsfor the plugin
classGraphDeobfuscatorCutter(cutter.CutterPlugin):
name = "APT32 GraphDeobfuscator"
description = "Graph Deobfuscatorfor APT32 Samples"
version = "1.0"
author = "Itay Cohen(@Megabeets_)"
def setupPlugin(self):
pass
def setupInterface(self, main):
# Create a new action (menu item)
action = QAction("APT32 GraphDeobfuscator", main)
action.setCheckable(False)
# Connect the action to a function- cleaner.
# A click on this action willtrigger the function
action.triggered.connect(self.cleaner)
# Add the action to the"Windows -> Plugins" menu
pluginsMenu =main.getMenuByType(main.MenuType.Plugins)
pluginsMenu.addAction(action)
def cleaner(self):
graph_deobfuscator =GraphDeobfuscator(pipe)
graph_deobfuscator.clean_graph()
cutter.refresh()
def create_cutter_plugin():
return GraphDeobfuscatorCutter()
if__name__ == "__main__":
graph_deobfuscator =GraphDeobfuscator(pipe)
graph_deobfuscator.clean_graph()
* 参考来源:checkpoint,FB小编Alpha_h4ck编译,转载请注明来自FreeBuf.COM