Skip to content

Commit

Permalink
Merge pull request #219 from fireeye/fix-218
Browse files Browse the repository at this point in the history
ida: use a local context for cache instead of global
  • Loading branch information
williballenthin authored Aug 5, 2020
2 parents 1c3da73 + 4bb13d6 commit 9943de0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 77 deletions.
11 changes: 11 additions & 0 deletions capa/features/extractors/ida/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ def extract_file_features(self):
def get_functions(self):
import capa.features.extractors.ida.helpers as ida_helpers

# data structure shared across functions yielded here.
# useful for caching analysis relevant across a single workspace.
ctx = {}

# ignore library functions and thunk functions as identified by IDA
for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True):
setattr(f, "ctx", ctx)
yield add_ea_int_cast(f)

@staticmethod
def get_function(ea):
f = idaapi.get_func(ea)
setattr(f, "ctx", {})
return add_ea_int_cast(f)

def extract_function_features(self, f):
for (feature, ea) in capa.features.extractors.ida.function.extract_features(f):
yield feature, ea
Expand Down
45 changes: 21 additions & 24 deletions capa/features/extractors/ida/insn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,38 @@
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
from capa.features.insn import Number, Offset, Mnemonic

_file_imports_cache = None


def get_arch():
def get_arch(ctx):
"""
fetch the ARCH_* constant for the currently open workspace.
we expect this routine to be pretty lightweight, so we don't cache it.
via Tamir Bahar/@tmr232
https://reverseengineering.stackexchange.com/a/11398/17194
"""
info = idaapi.get_inf_structure()
if info.is_64bit():
return ARCH_X64
elif info.is_32bit():
return ARCH_X32
else:
raise ValueError("unexpected architecture")
if "arch" not in ctx:
info = idaapi.get_inf_structure()
if info.is_64bit():
ctx["arch"] = ARCH_X64
elif info.is_32bit():
ctx["arch"] = ARCH_X32
else:
raise ValueError("unexpected architecture")
return ctx["arch"]


def get_imports():
""" """
global _file_imports_cache
if _file_imports_cache is None:
_file_imports_cache = capa.features.extractors.ida.helpers.get_file_imports()
return _file_imports_cache
def get_imports(ctx):
if "imports_cache" not in ctx:
ctx["imports_cache"] = capa.features.extractors.ida.helpers.get_file_imports()
return ctx["imports_cache"]


def check_for_api_call(insn):
def check_for_api_call(ctx, insn):
""" check instruction for API call """
if not idaapi.is_call_insn(insn):
return

for ref in idautils.CodeRefsFrom(insn.ea, False):
info = get_imports().get(ref, ())
info = get_imports(ctx).get(ref, ())
if info:
yield "%s.%s" % (info[0], info[1])
else:
Expand All @@ -59,7 +56,7 @@ def check_for_api_call(insn):
if f and (f.flags & idaapi.FUNC_THUNK):
for thunk_ref in idautils.DataRefsFrom(ref):
# TODO: always data ref for thunk??
info = get_imports().get(thunk_ref, ())
info = get_imports(ctx).get(thunk_ref, ())
if info:
yield "%s.%s" % (info[0], info[1])

Expand All @@ -75,7 +72,7 @@ def extract_insn_api_features(f, bb, insn):
example:
call dword [0x00473038]
"""
for api in check_for_api_call(insn):
for api in check_for_api_call(f.ctx, insn):
for (feature, ea) in capa.features.extractors.helpers.generate_api_features(api, insn.ea):
yield feature, ea

Expand Down Expand Up @@ -105,7 +102,7 @@ def extract_insn_number_features(f, bb, insn):
const = capa.features.extractors.ida.helpers.mask_op_val(op)
if not idaapi.is_mapped(const):
yield Number(const), insn.ea
yield Number(const, arch=get_arch()), insn.ea
yield Number(const, arch=get_arch(f.ctx)), insn.ea


def extract_insn_bytes_features(f, bb, insn):
Expand Down Expand Up @@ -173,7 +170,7 @@ def extract_insn_offset_features(f, bb, insn):
op_off = capa.features.extractors.helpers.twos_complement(op_off, 32)

yield Offset(op_off), insn.ea
yield Offset(op_off, arch=get_arch()), insn.ea
yield Offset(op_off, arch=get_arch(f.ctx)), insn.ea


def contains_stack_cookie_keywords(s):
Expand Down Expand Up @@ -322,7 +319,7 @@ def extract_insn_cross_section_cflow(f, bb, insn):
insn (IDA insn_t)
"""
for ref in idautils.CodeRefsFrom(insn.ea, False):
if ref in get_imports().keys():
if ref in get_imports(f.ctx).keys():
# ignore API calls
continue
if not idaapi.getseg(ref):
Expand Down
75 changes: 22 additions & 53 deletions tests/test_ida_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ def extract_basic_block_features(f, bb):

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_api_features():
# have to import import this inline so pytest doesn't bail outside of IDA
import idaapi

f = idaapi.get_func(0x403BAC)
f = get_extractor().get_function(0x403BAC)
features = extract_function_features(f)
assert capa.features.insn.API("advapi32.CryptAcquireContextW") in features
assert capa.features.insn.API("advapi32.CryptAcquireContext") in features
Expand All @@ -94,9 +91,7 @@ def test_api_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_string_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.String("SCardControl") in features
assert capa.features.String("SCardTransmit") in features
Expand All @@ -107,9 +102,7 @@ def test_string_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_byte_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
wanted = capa.features.Bytes("SCardControl".encode("utf-16le"))
# use `==` rather than `is` because the result is not `True` but a truthy value.
Expand All @@ -118,9 +111,7 @@ def test_byte_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_number_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.insn.Number(0xFF) in features
assert capa.features.insn.Number(0x3136B0) in features
Expand All @@ -131,9 +122,7 @@ def test_number_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_number_arch_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.insn.Number(0xFF) in features
assert capa.features.insn.Number(0xFF, arch=ARCH_X32) in features
Expand All @@ -142,9 +131,7 @@ def test_number_arch_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_offset_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.insn.Offset(0x0) in features
assert capa.features.insn.Offset(0x4) in features
Expand All @@ -156,17 +143,15 @@ def test_offset_features():
# this function has the following negative offsets
# movzx ecx, byte ptr [eax-1]
# movzx eax, byte ptr [eax-2]
f = idaapi.get_func(0x4011FB)
f = get_extractor().get_function(0x4011FB)
features = extract_function_features(f)
assert capa.features.insn.Offset(-0x1) in features
assert capa.features.insn.Offset(-0x2) in features


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_offset_arch_features(mimikatz):
import idaapi

f = idaapi.get_func(0x40105D)
def test_offset_arch_features():
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.insn.Offset(0x0) in features
assert capa.features.insn.Offset(0x0, arch=ARCH_X32) in features
Expand All @@ -175,18 +160,14 @@ def test_offset_arch_features(mimikatz):

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_nzxor_features():
import idaapi

f = idaapi.get_func(0x410DFC)
f = get_extractor().get_function(0x410DFC)
features = extract_function_features(f)
assert capa.features.Characteristic("nzxor") in features # 0x0410F0B


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_mnemonic_features():
import idaapi

f = idaapi.get_func(0x40105D)
f = get_extractor().get_function(0x40105D)
features = extract_function_features(f)
assert capa.features.insn.Mnemonic("push") in features
assert capa.features.insn.Mnemonic("movzx") in features
Expand All @@ -206,10 +187,9 @@ def test_file_section_name_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_tight_loop_features():
import idaapi

extractor = get_extractor()
f = idaapi.get_func(0x402EC4)

f = extractor.get_function(0x402EC4)
for bb in extractor.get_basic_blocks(f):
if bb.__int__() != 0x402F8E:
continue
Expand All @@ -220,10 +200,9 @@ def test_tight_loop_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_tight_loop_bb_features():
import idaapi

extractor = get_extractor()
f = idaapi.get_func(0x402EC4)

f = extractor.get_function(0x402EC4)
for bb in extractor.get_basic_blocks(f):
if bb.__int__() != 0x402F8E:
continue
Expand All @@ -245,56 +224,46 @@ def test_file_import_name_features():

@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_stackstring_features():
import idaapi

f = idaapi.get_func(0x4556E5)
f = get_extractor().get_function(0x4556E5)
features = extract_function_features(f)
assert capa.features.Characteristic("stack string") in features


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_switch_features():
import idaapi

f = idaapi.get_func(0x409411)
f = get_extractor().get_function(0x409411)
features = extract_function_features(f)
assert capa.features.Characteristic("switch") in features

f = idaapi.get_func(0x409393)
f = get_extractor().get_function(0x409393)
features = extract_function_features(f)
assert capa.features.Characteristic("switch") not in features


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_function_calls_to():
import idaapi

# this function is used in a function pointer
f = idaapi.get_func(0x4011FB)
f = get_extractor().get_function(0x4011FB)
features = extract_function_features(f)
assert capa.features.Characteristic("calls to") not in features

# __FindPESection is called once
f = idaapi.get_func(0x470360)
f = get_extractor().get_function(0x470360)
features = extract_function_features(f)
assert len(features[capa.features.Characteristic("calls to")]) == 1


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_function_calls_from():
import idaapi

f = idaapi.get_func(0x4011FB)
f = get_extractor().get_function(0x4011FB)
features = extract_function_features(f)
assert capa.features.Characteristic("calls from") in features
assert len(features[capa.features.Characteristic("calls from")]) == 3


@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_basic_block_count():
import idaapi

f = idaapi.get_func(0x4011FB)
f = get_extractor().get_function(0x4011FB)
features = extract_function_features(f)
assert len(features[capa.features.basicblock.BasicBlock()]) == 15

Expand Down

0 comments on commit 9943de0

Please sign in to comment.