Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Volatility
- #
- # Authors:
- # Michael Hale Ligh <michael.ligh@mnin.org>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or (at
- # your option) any later version.
- #
- # This program is distributed in the hope that it will be useful, but
- # WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- #
- import os, sys, string, struct, subprocess, binascii, time, re
- from operator import itemgetter
- from bisect import bisect_right
- import volatility.debug as debug
- import volatility.obj as obj
- import volatility.plugins.procdump as procdump
- import volatility.win32.tasks as tasks
- import volatility.win32.modules as modules
- import volatility.commands as commands
- import volatility.utils as utils
- import volatility.plugins.ssdt as ssdt
- import volatility.plugins.filescan as filescan
- import volatility.scan as scan
- import volatility.plugins.modscan as modscan
- import volatility.plugins.taskmods as taskmods
- import volatility.plugins.overlays.windows.windows as windows
- #--------------------------------------------------------------------------------
- # dependents
- #--------------------------------------------------------------------------------
- try:
- import yara
- except ImportError:
- print "YARA is not installed, see http://code.google.com/p/yara-project/"
- try:
- import distorm3
- except ImportError:
- print 'distorm3 is not installed, see http://code.google.com/p/distorm/'
- #--------------------------------------------------------------------------------
- # memory protection flags from nt!MmProtectToValue
- #--------------------------------------------------------------------------------
- PROTECT_FLAGS = [
- 'PAGE_NOACCESS',
- 'PAGE_READONLY',
- 'PAGE_EXECUTE',
- 'PAGE_EXECUTE_READ',
- 'PAGE_READWRITE',
- 'PAGE_WRITECOPY',
- 'PAGE_EXECUTE_READWRITE',
- 'PAGE_EXECUTE_WRITECOPY',
- 'PAGE_NOACCESS',
- 'PAGE_NOCACHE | PAGE_READONLY',
- 'PAGE_NOCACHE | PAGE_EXECUTE',
- 'PAGE_NOCACHE | PAGE_EXECUTE_READ',
- 'PAGE_NOCACHE | PAGE_READWRITE',
- 'PAGE_NOCACHE | PAGE_WRITECOPY',
- 'PAGE_NOCACHE | PAGE_EXECUTE_READWRITE',
- 'PAGE_NOCACHE | PAGE_EXECUTE_WRITECOPY',
- 'PAGE_NOACCESS',
- 'PAGE_GUARD | PAGE_READONLY',
- 'PAGE_GUARD | PAGE_EXECUTE',
- 'PAGE_GUARD | PAGE_EXECUTE_READ',
- 'PAGE_GUARD | PAGE_READWRITE',
- 'PAGE_GUARD | PAGE_WRITECOPY',
- 'PAGE_GUARD | PAGE_EXECUTE_READWRITE',
- 'PAGE_GUARD | PAGE_EXECUTE_WRITECOPY',
- 'PAGE_NOACCESS',
- 'PAGE_WRITECOMBINE | PAGE_READONLY',
- 'PAGE_WRITECOMBINE | PAGE_EXECUTE',
- 'PAGE_WRITECOMBINE | PAGE_EXECUTE_READ',
- 'PAGE_WRITECOMBINE | PAGE_READWRITE',
- 'PAGE_WRITECOMBINE | PAGE_WRITECOPY',
- 'PAGE_WRITECOMBINE | PAGE_EXECUTE_READWRITE',
- 'PAGE_WRITECOMBINE | PAGE_EXECUTE_WRITECOPY',
- ]
- #--------------------------------------------------------------------------------
- # Special data types not in the public symbol files
- #--------------------------------------------------------------------------------
- malware_types = {
- #
- # Types for the svcscan plugin (XP only)
- #
- '_SERVICE_LIST_ENTRY' : [ 0x8, {
- 'Blink' : [ 0x0, ['pointer', ['_SERVICE_RECORD_LEGACY']]],
- 'Flink' : [ 0x4, ['pointer', ['_SERVICE_RECORD_LEGACY']]],
- } ],
- '_SERVICE_RECORD_LEGACY' : [ 0x70, {
- 'ServiceList' : [ 0x0, ['_SERVICE_LIST_ENTRY']],
- 'ServiceName' : [ 0x8, ['pointer', ['unsigned short']]],
- 'DisplayName' : [ 0xc, ['pointer', ['unsigned short']]],
- 'Order' : [ 0x10, ['int']],
- 'TagSignature' : [ 0x18, ['int']],
- 'Binary1' : [ 0x24, ['pointer', ['unsigned short']]],
- 'Binary2' : [ 0x24, ['pointer', ['_SERVICE_BINARY_LEGACY']]],
- 'Type' : [ 0x28, ['int']],
- 'State' : [ 0x2c, ['int']],
- } ],
- '_SERVICE_BINARY_LEGACY' : [ 0x14, {
- 'ServicePath' : [ 0x8, ['pointer', ['unsigned short']]],
- 'ProcessId' : [ 0xc, ['int']],
- } ],
- #
- # Types for the svcscan plugin (Vista and 7)
- #
- '_SERVICE_HEADER': [ None, {
- # Signature of "serH"
- 'Tag': [ 0x0, ['array', 4, ['unsigned char']]],
- # Pointer to the main record
- 'Ser': [ 0xC, ['pointer', ['_SERVICE_RECORD']]],
- } ],
- '_SERVICE_RECORD': [ None, {
- # Previous entry in the singly linked list
- 'PrevEntry': [ 0x0, ['pointer', ['_SERVICE_RECORD']]],
- # The service's name
- 'ServiceName': [ 0x4, ['pointer', ['unsigned short']]],
- # The service's display name
- 'DisplayName': [ 0x8, ['pointer', ['unsigned short']]],
- # The unique order number of the service
- 'Order': [ 0xC, ['unsigned int']],
- # Union which is LPWSTR for drivers or points to another struct for processes
- 'Binary2': [ 0x1C, ['pointer', ['_SERVICE_BINARY']]],
- 'Binary1': [ 0x1C, ['pointer', ['unsigned char']]],
- # The service type (kernel driver, own process, shared process, etc)
- 'Type': [ 0x20, ['unsigned int']],
- # The service state (running, stopped, paused, etc)
- 'State': [ 0x24, ['unsigned int']],
- } ],
- '_SERVICE_BINARY': [ None, {
- # The load path of the service binary, includes command line arguments
- 'ServicePath': [ 0x8, ['pointer', ['unsigned short']]],
- # Process ID if the service is active
- 'ProcessId': [ 0xC, ['unsigned int']],
- } ],
- #
- # Types for the csrpslist plugin
- #
- '_CSR_PROCESS_POINTER' : [ None, {
- 'CsrRootProcess' : [ 0x0, ['pointer', ['pointer', ['_CSR_PROCESS']]]],
- } ],
- '_CSR_PROCESS' : [ 0x60, {
- 'ClientId' : [ 0x0, ['_CLIENT_ID']],
- 'ListLink' : [ 0x8, ['_LIST_ENTRY']],
- 'ThreadList' : [ 0x10, ['_LIST_ENTRY']],
- 'NtSession' : [ 0x18, ['pointer', ['_CSR_NT_SESSION']]],
- 'ClientPort' : [ 0x1c, ['pointer', ['void']]],
- 'ClientViewBase' : [ 0x20, ['pointer', ['unsigned char']]],
- 'ClientViewBounds' : [ 0x24, ['pointer', ['unsigned char']]],
- 'ProcessHandle' : [ 0x28, ['pointer', ['void']]],
- 'SequenceNumber' : [ 0x2c, ['unsigned long']],
- 'Flags' : [ 0x30, ['unsigned long']],
- 'DebugFlags' : [ 0x34, ['unsigned long']],
- 'ReferenceCount' : [ 0x38, ['unsigned long']],
- 'ProcessGroupId' : [ 0x3c, ['unsigned long']],
- 'ProcessGroupSequence' : [ 0x40, ['unsigned long']],
- 'LastMessageSequence' : [ 0x44, ['unsigned long']],
- 'NumOutstandingMessages' : [ 0x48, ['unsigned long']],
- 'ShutdownLevel' : [ 0x4c, ['unsigned long']],
- 'ShutdownFlags' : [ 0x50, ['unsigned long']],
- 'Luid' : [ 0x54, ['_LUID']],
- 'ServerDllPerProcessData' : [ 0x5c, ['array', 1, ['pointer', ['void']]]],
- } ],
- '_CSR_THREAD' : [ 0x38, {
- 'CreateTime' : [ 0x0, ['_LARGE_INTEGER']],
- 'Link' : [ 0x8, ['_LIST_ENTRY']],
- 'HashLinks' : [ 0x10, ['_LIST_ENTRY']],
- 'ClientId' : [ 0x18, ['_CLIENT_ID']],
- 'Process' : [ 0x20, ['pointer', ['_CSR_PROCESS']]],
- 'ThreadHandle' : [ 0x24, ['pointer', ['void']]],
- 'Flags' : [ 0x28, ['unsigned long']],
- 'ReferenceCount' : [ 0x2c, ['unsigned long']],
- 'ImpersonateCount' : [ 0x30, ['unsigned long']],
- } ],
- '_CSR_NT_SESSION' : [ 0x18, {
- 'SessionLink' : [ 0x0, ['_LIST_ENTRY']],
- 'SessionId' : [ 0x8, ['unsigned long']],
- 'ReferenceCount' : [ 0xc, ['unsigned long']],
- 'RootDirectory' : [ 0x10, ['_STRING']],
- } ],
- #
- # Types for pe file header parsing
- #
- '_IMAGE_EXPORT_DIRECTORY': [ 0x28, {
- 'Base': [ 0x10, ['unsigned int']],
- 'NumberOfFunctions': [ 0x14, ['unsigned int']],
- 'NumberOfNames': [ 0x18, ['unsigned int']],
- 'AddressOfFunctions': [ 0x1C, ['unsigned int']],
- 'AddressOfNames': [ 0x20, ['unsigned int']],
- 'AddressOfNameOrdinals': [ 0x24, ['unsigned int']],
- } ],
- '_IMAGE_IMPORT_DESCRIPTOR': [ 0x14, {
- 'OriginalFirstThunk': [ 0x0, ['unsigned int']],
- 'TimeDateStamp': [ 0x4, ['unsigned int']],
- 'ForwarderChain': [ 0x8, ['unsigned int']],
- 'Name': [ 0xC, ['unsigned int']],
- 'FirstThunk': [ 0x10, ['pointer', ['unsigned int']]],
- } ],
- '_IMAGE_THUNK_DATA' : [ 0x4, {
- 'AddressOfData' : [ 0x0, ['unsigned int']],
- } ],
- '_IMAGE_IMPORT_BY_NAME' : [ None, {
- 'Hint' : [ 0x0, ['unsigned short']],
- 'Name' : [ 0x2, ['array', 20, ['unsigned char']]],
- } ],
- '_IMAGE_SECTION_HEADER' : [ None, {
- 'Name' : [ 0x0, ['String', dict(length = 8)]],
- }],
- #
- # Types for callbacks (Thanks to Frank Boldewin for some clues)
- #
- '_NOTIFICATION_PACKET' : [ 0x10, {
- 'ListEntry' : [ 0x0, ['_LIST_ENTRY']],
- 'DriverObject' : [ 0x8, ['pointer', ['_DRIVER_OBJECT']]],
- 'NotificationRoutine' : [ 0xC, ['unsigned int']],
- } ],
- '_KBUGCHECK_CALLBACK_RECORD' : [ 0x20, {
- 'Entry' : [ 0x0, ['_LIST_ENTRY']],
- 'CallbackRoutine' : [ 0x8, ['unsigned int']],
- 'Buffer' : [ 0xC, ['pointer', ['void']]],
- 'Length' : [ 0x10, ['unsigned int']],
- 'Component' : [ 0x14, ['pointer', ['unsigned char']]],
- 'Checksum' : [ 0x18, ['pointer', ['unsigned int']]],
- 'State' : [ 0x1C, ['unsigned char']],
- } ],
- '_KBUGCHECK_REASON_CALLBACK_RECORD' : [ 0x1C, {
- 'Entry' : [ 0x0, ['_LIST_ENTRY']],
- 'CallbackRoutine' : [ 0x8, ['unsigned int']],
- 'Component' : [ 0xC, ['pointer', ['unsigned char']]],
- 'Checksum' : [ 0x10, ['pointer', ['unsigned int']]],
- 'Reason' : [ 0x14, ['unsigned int']],
- 'State' : [ 0x18, ['unsigned char']],
- } ],
- '_SHUTDOWN_PACKET' : [ 0xC, {
- 'Entry' : [ 0x0, ['_LIST_ENTRY']],
- 'DeviceObject' : [ 0x8, ['pointer', ['_DEVICE_OBJECT']]],
- } ],
- '_EX_CALLBACK_ROUTINE_BLOCK' : [ 0x8, {
- 'RundownProtect' : [ 0x0, ['unsigned int']], \
- 'Function' : [ 0x4, ['unsigned int']], \
- 'Context' : [ 0x8, ['unsigned int']], \
- } ],
- '_GENERIC_CALLBACK' : [ 0xC, {
- 'Callback' : [ 0x4, ['pointer', ['void']]],
- 'Associated' : [ 0x8, ['pointer', ['void']]],
- } ],
- '_REGISTRY_CALLBACK_LEGACY' : [ 0x38, {
- 'CreateTime' : [ 0x0, ['WinTimeStamp', {}]],
- } ],
- '_REGISTRY_CALLBACK' : [ None, {
- 'ListEntry' : [ 0x0, ['_LIST_ENTRY']],
- 'Function' : [ 0x1C, ['pointer', ['void']]],
- } ],
- '_DBGPRINT_CALLBACK' : [ 0x14, {
- 'Function' : [ 0x8, ['pointer', ['void']]],
- } ],
- '_SEG_DESCRIPTOR' : [ 0x8, {
- 'size1': [ 0x0, ['BitField', dict(start_bit = 0, end_bit = 16)]],
- 'baseaddress1': [ 0x2, ['BitField', dict(start_bit = 0, end_bit = 16)]],
- 'baseaddress2': [ 0x4, ['BitField', dict(start_bit = 0, end_bit = 8)]],
- 'type': [ 0x4, ['BitField', dict(start_bit = 8, end_bit = 12)]],
- 'sFlag': [ 0x4, ['BitField', dict(start_bit = 12, end_bit = 13)]],
- 'dpl': [ 0x4, ['BitField', dict(start_bit = 13, end_bit = 15)]],
- 'pFlag': [ 0x4, ['BitField', dict(start_bit = 15, end_bit = 16)]],
- 'size2': [ 0x6, ['BitField', dict(start_bit = 0, end_bit = 4)]],
- 'notused': [ 0x6, ['BitField', dict(start_bit = 4, end_bit = 5)]],
- 'lFlag': [ 0x6, ['BitField', dict(start_bit = 5, end_bit = 6)]],
- 'DB': [ 0x6, ['BitField', dict(start_bit = 6, end_bit = 7)]],
- 'gFlag': [ 0x6, ['BitField', dict(start_bit = 7, end_bit = 8)]],
- 'baseaddress3': [ 0x6, ['BitField', dict(start_bit = 8, end_bit = 16)]],
- } ],
- '_CALL_GATE_DESCRIPTOR' : [ 0x8, {
- 'offset1': [ 0x0, ['BitField', dict(start_bit = 0, end_bit = 16)]],
- 'selector': [ 0x2, ['BitField', dict(start_bit = 0, end_bit = 16)]],
- 'argcount': [ 0x4, ['BitField', dict(start_bit = 0, end_bit = 5)]],
- 'zeroes': [ 0x4, ['BitField', dict(start_bit = 5, end_bit = 8)]],
- 'type': [ 0x4, ['BitField', dict(start_bit = 8, end_bit = 12)]],
- 'sFlag': [ 0x4, ['BitField', dict(start_bit = 12, end_bit = 13)]],
- 'dpl': [ 0x4, ['BitField', dict(start_bit = 13, end_bit = 15)]],
- 'pFlag': [ 0x4, ['BitField', dict(start_bit = 15, end_bit = 16)]],
- 'offset2': [ 0x6, ['BitField', dict(start_bit = 0, end_bit = 16)]],
- } ],
- }
- #--------------------------------------------------------------------------------
- # Symbol classes used with API hooks etc
- #--------------------------------------------------------------------------------
- class Symbol:
- def __init__(self, owner_mod=None, ordinal=None, funcaddr=None, name=None, forwarder=None, dll=None):
- self.owner_mod = owner_mod
- self.ordinal = ordinal
- self.funcaddr = funcaddr & 0xFFFFFFFF
- self.name = name
- self.forwarder = forwarder
- def get_name(self):
- """Returns the function name, its ordinal value, or UNKNOWN"""
- if self.name != None and self.name != "":
- return self.name
- elif self.ordinal != None:
- return hex(self.ordinal)
- else:
- return 'UNKNOWN'
- def get_instruction(self, instruction_number = 0):
- """Returns the Nth instruction at an API's start address"""
- data = self.owner_mod.obj_vm.zread(self.funcaddr, (instruction_number + 1) * 12)
- c = 0
- for op in distorm3.Decompose(self.funcaddr, data, distorm3.Decode32Bits):
- if not op.valid:
- break
- if c == instruction_number:
- return op
- c += 1
- class ExportedSymbol(Symbol):
- pass
- class ImportedSymbol(Symbol):
- pass
- #--------------------------------------------------------------------------------
- # Extensions of Volatility objects - stable on all OS
- #--------------------------------------------------------------------------------
- class _IMAGE_IMPORT_DESCRIPTOR(obj.CType):
- """Handles IID entries for imported functions"""
- def __init__(self, theType = None, offset = None, vm = None, parent = None, *args, **kwargs):
- self.sectoffset = offset
- obj.CType.__init__(self, theType = theType, offset = offset, vm = vm, parent = parent, *args, **kwargs)
- def is_list_end(self):
- """Returns True if we've reached the list end (a struct with all zeroes)"""
- data = self.obj_vm.zread(self.obj_offset, self.obj_vm.profile.get_obj_size('_IMAGE_IMPORT_DESCRIPTOR'))
- if data.count(chr(0)) == len(data):
- return True
- return False
- def get_name(self):
- """Returns the name of the DLL for this IID"""
- ###This try block is for http://code.google.com/p/volatility/issues/detail?id=57
- try:
- va = self.obj_parent.DllBase + self.Name
- except:
- return ''
- if self.obj_vm.is_valid_address(va):
- return read_asciiz(self.obj_vm, va)
- else:
- return ''
- def get_table(self, thunk_va, img_size):
- """Generator for ITD structures"""
- i = 0
- thunk_size = self.obj_vm.profile.get_obj_size('_IMAGE_THUNK_DATA')
- while True:
- thunk = obj.Object('_IMAGE_THUNK_DATA', offset = thunk_va + (i*thunk_size), vm = self.obj_vm)
- if not thunk or not thunk.AddressOfData:
- break
- yield thunk
- i += 1
- def get_imports(self):
- """Generator for enumerating imported functions"""
- mod_base = self.obj_parent.DllBase
- dll = self.get_name()
- if not is_valid_dos_filename(dll):
- dll = '*invalid*'
- try:
- # The lookup table, contains function names
- ilt = list(self.get_table(mod_base + self.OriginalFirstThunk))
- # The address table, contains bound addresses
- iat = list(self.get_table(mod_base + self.FirstThunk))
- except:
- raise StopIteration('Invalid')
- # We can use either ilt or iat here, doesn't really matter
- table = ilt
- for i in xrange(len(table)):
- #print i, table[i], table[i].AddressOfData
- imp_ord = imp_name = None
- if table[i].AddressOfData & IMAGE_ORDINAL_FLAG:
- imp_ord = table[i].AddressOfData & 0xffff
- else:
- imp_name = read_asciiz(self.obj_vm, mod_base + table[i].AddressOfData + 2)
- if not is_valid_function_name(imp_name):
- imp_name = "*invalid*"
- imp_address = self.FirstThunk + mod_base + i * 4
- imp_bound = obj.Object('unsigned int', offset = imp_address, vm = self.obj_vm)
- if imp_bound:
- imp = ImportedSymbol(
- owner_mod = self.obj_parent,
- ordinal = imp_ord,
- funcaddr = imp_bound,
- name = imp_name,
- forwarder = None)
- yield dll, imp
- class _IMAGE_EXPORT_DIRECTORY(obj.CType):
- """Handles IED entries for exported functions"""
- def __init__(self, theType = None, offset = None, vm = None, parent = None, *args, **kwargs):
- self.sectoffset = offset
- obj.CType.__init__(self, theType = theType, offset = offset, vm = vm, parent = parent, *args, **kwargs)
- def get_forwarder(self, mod_base, func_rva):
- """If we are dealing with forwarded exports, return the name of the target function"""
- return None
- exp_addr = self.obj_parent.VirtualAddress
- exp_size = self.obj_parent.Size
- if func_rva >= exp_addr and func_rva < exp_addr + exp_size:
- return self.read_asciiz(mod_base + func_rva)
- else:
- return None
- def get_exports(self):
- """Generator for exported functions"""
- mod_base = self.obj_parent.DllBase
- seen_ordinals = []
- address_of_functions = obj.Object('Array', vm = self.obj_vm, \
- offset = mod_base + self.AddressOfFunctions, targetType = 'unsigned int', count = self.NumberOfFunctions)
- address_of_names = obj.Object('Array', vm = self.obj_vm, \
- offset = mod_base + self.AddressOfNames, targetType = 'unsigned int', count = self.NumberOfNames)
- address_of_name_ordinals = obj.Object('Array', vm = self.obj_vm, \
- offset = mod_base + self.AddressOfNameOrdinals, targetType = 'unsigned short', count = self.NumberOfNames)
- # Handle functions exported by name *and* ordinal
- for i in range(self.NumberOfNames):
- name_rva = address_of_names[i]
- ordinal = address_of_name_ordinals[i]
- if ordinal == None:
- continue
- if name_rva == None or name_rva == 0:
- continue
- if ordinal >= self.NumberOfFunctions:
- continue
- func_rva = address_of_functions[ordinal]
- if func_rva == None or func_rva == 0:
- continue
- forwarder = self.get_forwarder(mod_base, func_rva)
- name = read_asciiz(self.obj_vm, mod_base + name_rva)
- ordinal += self.Base
- seen_ordinals.append(ordinal)
- yield ExportedSymbol(owner_mod = self.obj_parent,
- ordinal = ordinal,
- funcaddr = mod_base + func_rva,
- name = name,
- forwarder = forwarder)
- # Handle functions exported by ordinal only
- for i in range(self.NumberOfFunctions):
- ordinal = self.Base + i
- if ordinal not in seen_ordinals:
- func_rva = address_of_functions[i]
- if func_rva == None or func_rva == 0:
- continue
- forwarder = self.get_forwarder(mod_base, func_rva)
- seen_ordinals.append(ordinal)
- yield ExportedSymbol(owner_mod = self.obj_parent,
- ordinal = ordinal,
- funcaddr = mod_base + func_rva,
- name = None,
- forwarder = forwarder)
- class _LDR_DATA_TABLE_ENTRY(obj.CType):
- """Extension for PE files to help enumerate IAT and EAT"""
- def __init__(self, theType = None, offset = None, vm = None, parent = None, *args, **kwargs):
- self.sectoffset = offset
- obj.CType.__init__(self, theType = theType, offset = offset, vm = vm, parent = parent, *args, **kwargs)
- def get_nt_header(self):
- try:
- dos_header = obj.Object("_IMAGE_DOS_HEADER", offset = self.DllBase, vm = self.obj_vm)
- return dos_header.get_nt_header()
- except ValueError:
- return None
- def getprocaddress(self, name_to_find):
- """Return the virtual address of a given function exported by a DLL"""
- for exp in self.exports():
- if exp.get_name() == name_to_find:
- return exp.funcaddr
- return None
- def imports(self, addr_space = None):
- if addr_space != None:
- self.obj_vm = addr_space
- nt_header = self.get_nt_header()
- if nt_header == None:
- raise StopIteration
- imp_addr = nt_header.OptionalHeader.DataDirectory[1].VirtualAddress
- imp_size = nt_header.OptionalHeader.DataDirectory[1].Size
- if imp_addr > nt_header.OptionalHeader.SizeOfImage or imp_size <= 0:
- raise StopIteration
- obj_size = self.obj_vm.profile.get_obj_size('_IMAGE_IMPORT_DESCRIPTOR')
- i = 0
- imps = dict()
- while True:
- desc = obj.Object('_IMAGE_IMPORT_DESCRIPTOR', vm = self.obj_vm, \
- offset = self.DllBase + imp_addr + (i * obj_size), \
- parent = self)
- if not desc or desc.is_list_end():
- break
- try:
- for dll, imp in desc.get_imports():
- if imps.has_key(dll):
- imps[dll].append(imp)
- else:
- imps[dll] = [imp]
- except obj.InvalidOffsetError:
- pass
- i += 1
- for dll, symbols in imps.items():
- yield dll, symbols
- def exports(self, addr_space = None):
- if addr_space != None:
- self.newattr('obj_vm', addr_space)
- nt_header = self.get_nt_header()
- if nt_header == None:
- raise StopIteration
- exp_addr = nt_header.OptionalHeader.DataDirectory[0].VirtualAddress
- exp_size = nt_header.OptionalHeader.DataDirectory[0].Size
- if exp_addr > nt_header.OptionalHeader.SizeOfImage or exp_size <= 0:
- raise StopIteration
- exp_dir = obj.Object('_IMAGE_EXPORT_DIRECTORY', vm = self.obj_vm, offset = self.DllBase + exp_addr, parent = self)
- if exp_dir != None:
- if exp_dir.AddressOfFunctions > nt_header.OptionalHeader.SizeOfImage or \
- exp_dir.AddressOfNameOrdinals > nt_header.OptionalHeader.SizeOfImage or \
- exp_dir.AddressOfNames > nt_header.OptionalHeader.SizeOfImage or \
- exp_dir.NumberOfFunctions > 0x7FFF or exp_dir.NumberOfNames > 0x7FFF:
- raise StopIteration
- try:
- for exp in exp_dir.get_exports():
- yield exp
- except obj.InvalidOffsetError:
- pass
- class _EPROCESS(windows._EPROCESS):
- def list_modules(self):
- if self.UniqueProcessId and self.Peb.Ldr.InLoadOrderModuleList:
- for l in self.Peb.Ldr.InLoadOrderModuleList.list_of_type("_LDR_DATA_TABLE_ENTRY", "InLoadOrderLinks"):
- yield l
- class _IMAGE_DOS_HEADER(obj.CType):
- """DOS header"""
- def get_nt_header(self):
- """Get the NT header"""
- if self.e_magic != 0x5a4d:
- raise ValueError('e_magic {0:04X} is not a valid DOS signature.'.format(self.e_magic))
- nt_header = obj.Object("_IMAGE_NT_HEADERS",
- offset = self.e_lfanew + self.obj_offset,
- vm = self.obj_vm)
- if nt_header.Signature != 0x4550:
- raise ValueError('NT header signature {0:04X} is not a valid'.format(nt_header.Signature))
- return nt_header
- class _IMAGE_NT_HEADERS(obj.CType):
- """PE header"""
- def get_sections(self, unsafe):
- """Get the PE sections"""
- sect_size = self.obj_vm.profile.get_obj_size("_IMAGE_SECTION_HEADER")
- start_addr = self.FileHeader.SizeOfOptionalHeader + self.OptionalHeader.obj_offset
- for i in range(self.FileHeader.NumberOfSections):
- s_addr = start_addr + (i * sect_size)
- sect = obj.Object("_IMAGE_SECTION_HEADER", offset = s_addr, vm = self.obj_vm, parent = self)
- if not unsafe:
- sect.sanity_check_section()
- yield sect
- class _IMAGE_SECTION_HEADER(obj.CType):
- """PE section"""
- def sanity_check_section(self):
- """Sanity checks address boundaries"""
- # Note: all addresses here are RVAs
- image_size = self.obj_parent.OptionalHeader.SizeOfImage
- if self.VirtualAddress > image_size:
- raise ValueError('VirtualAddress {0:08x} is past the end of image.'.format(self.VirtualAddress))
- if self.Misc.VirtualSize > image_size:
- raise ValueError('VirtualSize {0:08x} is larger than image size.'.format(self.Misc.VirtualSize))
- if self.SizeOfRawData > image_size:
- raise ValueError('SizeOfRawData {0:08x} is larger than image size.'.format(self.SizeOfRawData))
- #--------------------------------------------------------------------------------
- # general lib area
- #--------------------------------------------------------------------------------
- #### This is from pefile
- allowed_filename = string.lowercase + string.uppercase + string.digits + "!#$%&'()-@^_`{}~+,.;=[]" + ''.join( [chr(i) for i in range(128, 256)] )
- def is_valid_dos_filename(s):
- if s is None or s == '' or not isinstance(s, str):
- return False
- for c in s:
- if c not in allowed_filename:
- return False
- return True
- #### This is from pefile
- allowed_function_name = string.lowercase + string.uppercase + string.digits + '_?@$()'
- def is_valid_function_name(s):
- if s is None or s == '' or not isinstance(s, str):
- return False
- for c in s:
- if c not in allowed_function_name:
- return False
- return True
- #### This is from pefile (no 64-bit support currently)
- IMAGE_ORDINAL_FLAG = 0x80000000L
- def read_asciiz(addr_space, string_address, max=255):
- """Read a NULL-terminated ASCII string in the object's address space"""
- if not addr_space.is_valid_address(string_address):
- return None
- try:
- str = addr_space.zread(string_address, max)
- return str[:str.index('\x00')]
- except:
- return None
- def wctomb(ptr, addr_space):
- """Read a variable length Unicode buffer and return it as ASCII"""
- if not addr_space.is_valid_address(ptr):
- return None
- try:
- ret = addr_space.read(ptr, 128)
- except:
- return None
- length = ret.find("\x00\x00")
- ret = ret[0:length]
- return ret.replace('\x00', '')
- def can_exec(f):
- """Returns true if a memory segment is executable"""
- try:
- flags = PROTECT_FLAGS[f]
- except IndexError:
- return False
- else:
- return flags.find("EXECUTE") != -1
- def hd(src, start=0, length=16):
- """Hexdump formula seen at http://code.activestate.com/recipes/142812-hex-dumper"""
- FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
- result=[]
- for i in xrange(0, len(src), length):
- s = src[i:i+length]
- hexa = ' '.join(["%02x"%ord(x) for x in s])
- printable = s.translate(FILTER)
- result.append("0x%08x %-*s %s\n" % (i+start, length*3, hexa, printable))
- return ''.join(result)
- def find_space(addr_space, procs, mod_base):
- """Search for an address space (usually looking for a GUI process)"""
- if addr_space.is_valid_address(mod_base):
- return addr_space
- for proc in procs:
- ps_ad = proc.get_process_address_space()
- if ps_ad != None:
- if ps_ad.is_valid_address(mod_base):
- return ps_ad
- return None
- def get_disasm_text(data, start, count = 10, stoponret=False):
- """Return a string representation of disassembled instructions"""
- ret = ""
- n = 0
- if sys.modules.has_key('distorm3'):
- iterable = distorm3.DecodeGenerator(start, data, distorm3.Decode32Bits)
- for (offset, size, instruction, hexdump) in iterable:
- if n == count:
- break
- ret += "%.8x: %-32s %s\n" % (offset, hexdump, instruction)
- # Optionally stop when reaching the end of a function
- if stoponret and instruction.startswith("RET"):
- break
- n += 1
- return ret
- def get_yara_text(data, hits, start):
- """Format YARA hits for viewing. Note: YARA 1.4a matches are different than 1.4"""
- ret = ""
- if not hits:
- return ret
- for hit in hits:
- ret += "YARA rule: {0}\n".format(hit.rule)
- if hit.meta.has_key('description'):
- ret += "Description: {0}\n".format(hit.meta['description'])
- # Account for the API change from Yara Python 1.4 to 1.4.a
- if isinstance(hit.strings, list):
- hit_strings = [(key, val) for (key, _, val) in hit.strings]
- elif isinstance(hit.strings, dict):
- hit_strings = [(key, val) for (key, val) in hit.strings.items()]
- else:
- return ret
- for (key, val) in hit_strings:
- makehex = False
- for c in val:
- if c not in string.printable:
- makehex = True
- break
- if makehex:
- val = binascii.hexlify(val)
- # Get a temporary buffer of the first 128 characters
- buffer = data[key:key+128]
- if hit.meta.has_key('null_string'):
- if buffer.find("\x00") != -1:
- val = buffer[0:buffer.find("\x00")]
- ret += "Hit: {0}\n".format(val)
- ret += "{0}\n".format(hd(buffer, start+key))
- return ret
- def get_flags(flags, val):
- """ text representation of flags from a dictionary """
- if flags.has_key(val):
- return flags[val]
- else:
- return [f for f in flags if (flags[f] | val == val)]
- def get_sorted_mods(modlist):
- """Return a tuple of modules (LDR_MODULEs) info"""
- mods = {}
- for mod in modlist:
- # Try using the base name first, otherwise strip the file name from the full path
- if not mod.BaseDllName:
- name = os.path.basename(str(mod.FullDllName))
- else:
- name = str(mod.BaseDllName)
- mods[mod.DllBase] = (mod, name.lower())
- mod_addrs = sorted(mods.keys())
- return mods, mod_addrs
- def get_mem_size(p, start):
- """Given a starting address in process memory, return the base and size of the region"""
- for vad in p.VadRoot.traverse():
- if vad == None:
- continue
- # Find the start and end range
- startvpn = vad.StartingVpn << 12
- endvpn = ((vad.EndingVpn + 1) << 12) - 1
- if startvpn <= start and endvpn > start:
- return (startvpn, (endvpn-startvpn))
- return None
- def find_module(modlist, mod_addrs, addr):
- """Modified version of BDG's binary search for modules. Note that modlist and mod_addrs
- must be sorted in order of the module base address."""
- pos = bisect_right(mod_addrs, addr) - 1
- if pos == -1: return None
- mod,name = modlist[mod_addrs[pos]]
- if (addr >= mod.DllBase and addr < mod.DllBase + mod.SizeOfImage):
- return mod
- else:
- return None
- def find_module_by_name(mods, names):
- """Lookup a module by its name"""
- for mod in mods:
- if str(mod.BaseDllName).lower() in names:
- return mod
- return None
- def find_module_by_name_ex(mods, names):
- """Lookup a module by its name"""
- ret = []
- for mod in mods:
- for name in names:
- if str(mod.BaseDllName).lower() == name.lower():
- ret.append(mod)
- return ret
- def get_vad_data(ps_ad, start, end):
- """Returns the data for a given VAD range. Optimizations by Ryan Smith @ hustlelabs.com"""
- range_data = ""
- num_pages = (end - start + 1) >> 12
- ps_ad_is_valid_address = ps_ad.is_valid_address
- ps_ad_read = ps_ad.read
- blank_page = '\x00' * 0x1000
- pages_one = [(ps_ad_read(start + index * 0x1000, 0x1000) if ps_ad_is_valid_address(start + index * 0x1000) else blank_page) for index in xrange(num_pages)]
- if None in pages_one:
- pages_one = [a_page if a_page != None else blank_page for a_page in pages_one]
- return ''.join(pages_one)
- def get_malware_space(config, astype=None):
- """Returns an address space with malware types"""
- addr_space = utils.load_as(config, astype) if astype else utils.load_as(config)
- for cls in [_IMAGE_IMPORT_DESCRIPTOR, _IMAGE_EXPORT_DIRECTORY, _LDR_DATA_TABLE_ENTRY, _PSP_CID_TABLE, _EPROCESS, _IMAGE_DOS_HEADER, _IMAGE_NT_HEADERS, _IMAGE_SECTION_HEADER]:
- addr_space.profile.object_classes[cls.__name__] = cls
- addr_space.profile.add_types(malware_types)
- return addr_space
- def get_obj_name(space, object):
- """ Return the name from OBJECT_HEADER """
- object_obj = obj.Object("_OBJECT_HEADER", \
- offset = object.obj_offset - \
- space.profile.get_obj_offset('_OBJECT_HEADER', 'Body'),
- vm = space)
- object_obj.kas = space
- return object_obj.get_object_name() or "(unnamed)"
- #--------------------------------------------------------------------------------
- # malfind plug-in
- #--------------------------------------------------------------------------------
- class Malfind(procdump.ProcExeDump):
- "[MALWARE] Find hidden and injected code"
- def __init__(self, config, *args):
- procdump.ProcExeDump.__init__(self, config, *args)
- config.add_option('YARA-RULES', short_option='Y', default=None,
- help='Use YARA rules in addition to finding injected code')
- config.add_option('YARA-RULES-ONLY', short_option='y', default=None,
- help='Use YARA rules only')
- config.add_option("KERNEL", short_option='K', default=False,
- action='store_true', help='Scan kernel modules')
- config.add_option("SCAN", short_option = 'S', default=False,
- action = 'store_true', help = 'Use PSScan')
- def rebuild(self, addr_space, start):
- """Use the ProcExeDump's get_image to rebuild a PE in memory"""
- pedata = ''
- if not addr_space.is_valid_address(start):
- return pedata
- for offset, code in self.get_image(sys.stdout, addr_space, start):
- # Pad out pedata long enough to contain the new code
- pedata += "\x00" * (len(code) + offset - len(pedata))
- # Swap out whatever's in pedata so far for the new code
- pedata = pedata[:offset] + code + pedata[offset + len(code):]
- return pedata
- def get_vads(self, proc):
- """Generator for VADs in a process"""
- ps_ad = proc.get_process_address_space()
- if ps_ad != None:
- for vad in proc.VadRoot.traverse():
- if vad == None:
- continue
- # Find the start and end range
- start = vad.StartingVpn << 12
- end = ((vad.EndingVpn + 1) << 12) - 1
- if start > 0xFFFFFFFF or end > (0xFFFFFFFF << 12):
- continue
- data = get_vad_data(ps_ad, start, end)
- if data != "":
- yield (ps_ad, start, end, vad.Tag, vad.Flags.Protection >> 24, data)
- def calculate(self):
- rules = None
- config = self._config
- if config.DUMP_DIR == None:
- debug.error("Please specify a dump directory (--dump-dir)")
- if not os.path.isdir(config.DUMP_DIR):
- debug.error(config.DUMP_DIR + " is not a directory")
- if config.YARA_RULES != None or config.YARA_RULES_ONLY != None:
- if sys.modules.has_key('yara'):
- stuff = config.YARA_RULES if config.YARA_RULES else config.YARA_RULES_ONLY
- # Compile the Yara rules from a specified file on disk
- if os.path.isfile(stuff):
- rules = yara.compile(stuff)
- else:
- # Compile the Yara rules from a string passed on command line
- # If the input starts with '{' or '/' then its a byte string or regex,
- # in which case we don't use quotes to wrap it.
- if stuff.startswith("{") or stuff.startswith("/"):
- s = stuff
- else:
- s = '"' + stuff + '"'
- rules = yara.compile(sources={
- 'namespace1' : 'rule z1 {strings: $a = ' + s + ' condition: $a}',
- })
- # We require YARA signatures for kernel mode
- elif config.KERNEL:
- debug.error("You must use YARA rules for kernel mode!")
- addr_space = get_malware_space(config)
- if config.KERNEL:
- mods = list(modules.lsmod(addr_space))
- procs = list(tasks.pslist(addr_space))
- for mod in mods:
- space = find_space(addr_space, procs, mod.DllBase)
- if space != None:
- data = space.zread(mod.DllBase, mod.SizeOfImage)
- hits = rules.match(data=data)
- if len(hits):
- yield mod.BaseDllName, None, mod.DllBase, (mod.DllBase + mod.SizeOfImage), None, None, '-', hits, data
- else:
- if self._config.OFFSET != None:
- procs = [self.virtual_process_from_physical_offset(addr_space, self._config.OFFSET)]
- elif self._config.SCAN:
- scan_procs = list(filescan.PSScan(self._config).calculate())
- procs = []
- for task in scan_procs:
- procs.append(self.virtual_process_from_physical_offset(addr_space, task.obj_offset))
- else:
- procs = list(self.filter_tasks(tasks.pslist(addr_space)))
- for proc in procs:
- # get the DLLs so we know what to whitelist
- if proc.Peb.Ldr.InLoadOrderModuleList:
- dll_bases = [l.DllBase for l in proc.Peb.Ldr.InLoadOrderModuleList.list_of_type("_LDR_DATA_TABLE_ENTRY", "InLoadOrderLinks")]
- else:
- dll_bases = []
- image_name = proc.ImageFileName
- if self._config.OFFSET:
- offset = proc.obj_offset
- else:
- offset = addr_space.vtop(proc.obj_offset)
- for ps_ad, start, end, tag, prx, data in self.get_vads(proc):
- # Ignore blocks of all zero
- if data.count(chr(0)) == len(data):
- continue
- suspicious = False
- hits = []
- # Rebuild the data as PE if necessary
- if data[0:2] == 'MZ':
- try:
- pedata = self.rebuild(ps_ad, start)
- except:
- pass
- else:
- data = pedata
- # flag exeuctable PE's *not* in the DLLs list. Note: sometimes the
- # DLL list is empty because the PEB areas are paged. in this case, we
- # would end up flagging all DLLs in the process, so check for that too.
- if dll_bases and start not in dll_bases and can_exec(prx):
- suspicious = True
- # Scan the memory with YARA
- if rules != None:
- hits = rules.match(data=data)
- # Keep executable short vads
- if (tag == "VadS") and can_exec(prx):
- suspicious = True
- if suspicious:
- if self._config.YARA_RULES_ONLY != None: continue
- else:
- if len(hits) == 0: continue
- # Save the data to a file
- fname = "{0}.{1:x}.{2:08x}-{3:08x}.dmp".format(image_name, offset, start, end)
- dump_path = os.path.join(config.DUMP_DIR, fname)
- f = open(dump_path, 'wb')
- f.write(data)
- f.close()
- yield image_name, proc.UniqueProcessId, start, end, tag, prx, dump_path, hits, data
- def render_text(self, outfd, data):
- outfd.write("{0:<20} {1:<6} {2:10} {3:10} {4:<8} {5:<6} {6}\n".format(
- 'Name', 'Pid', 'Start', 'End', 'Tag', 'Hits', 'Protect'))
- for (name,pid,start,end,tag,prx,fname,hits,chunk) in data:
- outfd.write("{0:<20} {1:<6} {2:#010x} {3:<#010x} {4:<8} {5:<6} {6}\n".format(
- name,
- pid or '',
- start,
- end,
- tag or '',
- len(hits),
- PROTECT_FLAGS[prx] if prx else ''))
- outfd.write("Dumped to: {0}\n".format(fname))
- # Don't hexdump if we have YARA hits, bc we'll print individual hexdumps for each hit
- if len(hits) == 0:
- outfd.write("{0}\n".format(hd(chunk[0:128],start)))
- # Disasm if the code is marked as executable
- if prx and can_exec(prx) and chunk[0:2] != 'MZ':
- outfd.write("Disassembly:\n{0}\n".format(get_disasm_text(chunk, start)))
- outfd.write("{0}\n".format(get_yara_text(chunk, hits, start)))
- #--------------------------------------------------------------------------------
- # svcscan plug-in
- #--------------------------------------------------------------------------------
- SERVICE_TYPES = dict(
- SERVICE_KERNEL_DRIVER = 0x01,
- SERVICE_FILE_SYSTEM_DRIVER = 0x02,
- SERVICE_WIN32_OWN_PROCESS = 0x10,
- SERVICE_WIN32_SHARE_PROCESS = 0x20,
- SERVICE_INTERACTIVE_PROCESS = 0x100,
- )
- SERVICE_STATES = dict(
- SERVICE_STOPPED = 0x01,
- SERVICE_START_PENDING = 0x02,
- SERVICE_STOP_PENDING = 0x3,
- SERVICE_RUNNING = 0x4,
- SERVICE_CONTINUE_PENDING = 0x5,
- SERVICE_PAUSE_PENDING = 0x6,
- SERVICE_PAUSED = 0x7,
- )
- class SvcScan(Malfind):
- "[MALWARE] Scan for Windows services"
- def __init__(self, config, *args):
- Malfind.__init__(self, config, *args)
- config.remove_option("DUMP-DIR")
- config.remove_option("PID")
- config.remove_option("UNSAFE")
- config.remove_option("SCAN")
- config.remove_option("KERNEL")
- config.remove_option("YARA-RULES")
- config.remove_option("YARA-RULES-ONLY")
- config.remove_option("OFFSET")
- def get_services(self, addr_space):
- """Generator that yields information on services found in the SCM process's memory"""
- TagOffset = addr_space.profile.get_obj_offset('_SERVICE_RECORD_LEGACY', 'TagSignature')
- procs = [p for p in tasks.pslist(addr_space) if str(p.ImageFileName) == "services.exe"]
- rules = yara.compile(sources={
- 'namespace1' : 'rule legacy {strings: $a = "sErv" condition: $a}',
- 'namespace2' : 'rule recent {strings: $a = "serH" condition: $a}',
- })
- # Stores the service record struct with the highest Order
- highest_rec = None
- # Cycle through the process memory segments
- for proc in procs:
- for info in self.get_vads(proc):
- ps_ad = info[0]
- start = info[1]
- data = info[5]
- # Find all the service record signatures
- for hit in rules.match(data=data):
- for (offset, _, _) in hit.strings:
- if hit.rule == "legacy":
- rec = obj.Object("_SERVICE_RECORD_LEGACY", start + offset - TagOffset, ps_ad)
- # Do a sanity check on the order
- if rec and rec.Order > 0 and rec.Order < 0xFFFF:
- yield rec
- elif hit.rule == "recent":
- rec = obj.Object('_SERVICE_HEADER', start + offset, ps_ad)
- # Just a few sanity checks....
- if not rec or \
- not ps_ad.is_valid_address(rec.Ser) or \
- rec.Ser.Order > 0xFFFF or \
- rec.Ser.State not in SERVICE_STATES.values():
- continue
- # Save the record with the highest Order
- if not highest_rec or highest_rec.Order < rec.Ser.Order:
- highest_rec = rec.Ser
- # Walk the list starting at the highest Order on down to 0
- if highest_rec:
- rec = highest_rec
- while rec and rec.PrevEntry:
- yield rec
- rec = rec.PrevEntry
- def calculate(self):
- addr_space = get_malware_space(self._config)
- for info in self.get_services(addr_space):
- yield info
- def render_text(self, outfd, data):
- outfd.write("{0:<12} {1:<8} {2:<8} {3:<16} {4:<40} {5:<30} {6:<20} {7}\n".format(
- "Record", "Order", "Pid", "Name", "DisplayName", "Type", "State", "Path"))
- for rec in data:
- Type = '|'.join(get_flags(SERVICE_TYPES, rec.Type))
- State = '|'.join(get_flags(SERVICE_STATES, rec.State))
- if 'SERVICE_KERNEL_DRIVER' in Type or 'SERVICE_FILE_SYSTEM_DRIVER' in Type:
- Binary = wctomb(rec.Binary1, rec.obj_vm)
- ProcId = '--------'
- else:
- Binary = wctomb(rec.Binary2.ServicePath, rec.obj_vm)
- ProcId = rec.Binary2.ProcessId
- if Binary == None:
- Binary = '--------'
- outfd.write("{0:<#12x} {1:<#8x} {2:<8} {3:16} {4:40} {5:<30} {6:<20} {7}\n".format(
- rec.v(),
- rec.Order,
- ProcId,
- repr(wctomb(rec.ServiceName, rec.obj_vm)),
- repr(wctomb(rec.DisplayName, rec.obj_vm)),
- Type,
- State,
- Binary,
- ))
- #--------------------------------------------------------------------------------
- # ldr_modules plug-in
- #--------------------------------------------------------------------------------
- class LdrModules(taskmods.DllList):
- "[MALWARE] Detect unlinked DLLs"
- def __init__(self, config, *args):
- taskmods.DllList.__init__(self, config, *args)
- def list_load_modules(self, task):
- """Generator to yield the DLLs in load order"""
- if task.UniqueProcessId and task.Peb.Ldr.InLoadOrderModuleList:
- for l in task.Peb.Ldr.InLoadOrderModuleList.list_of_type(
- "_LDR_DATA_TABLE_ENTRY", "InLoadOrderLinks"):
- yield l
- def list_init_modules(self, task):
- """Generator to yield the DLLs in initialization order"""
- if task.UniqueProcessId and task.Peb.Ldr.InInitializationOrderModuleList:
- for l in task.Peb.Ldr.InInitializationOrderModuleList.list_of_type(
- "_LDR_DATA_TABLE_ENTRY", "InInitializationOrderLinks"):
- yield l
- def list_mem_modules(self, task):
- """Generator to yield the DLLs in memory order"""
- if task.UniqueProcessId and task.Peb.Ldr.InMemoryOrderModuleList:
- for l in task.Peb.Ldr.InMemoryOrderModuleList.list_of_type(
- "_LDR_DATA_TABLE_ENTRY", "InMemoryOrderLinks"):
- yield l
- def list_mapped_files(self, p, pe_only=True, get_data=False):
- """Returns a dictionary of mapped files in a process. The keys are base addresses
- of the mapped files and the value is a tuple containing the file name and data"""
- mapped_files = {}
- ps_ad = p.get_process_address_space()
- ps_ad_read = ps_ad.read
- # Enumerate the VAD entries
- for vad in p.VadRoot.traverse():
- if vad == None:
- continue
- # We're only looking for VADs with mapped files
- try:
- FO = vad.get_file_object()
- except AttributeError:
- continue
- # Get the start and end address
- start = vad.StartingVpn << 12
- end = ((vad.EndingVpn + 1) << 12) - 1
- if start > 0x7FFFFFFF:
- continue
- # Return only PE files or all files?
- if not pe_only or ps_ad_read(start, 2) == 'MZ':
- # Return the file data also?
- if get_data:
- mapped_files[start] = (FO.FileName, get_vad_data(ps_ad, start, end))
- else:
- mapped_files[start] = (FO.FileName, None)
- return mapped_files
- def calculate(self):
- addr_space = get_malware_space(self._config)
- if self._config.OFFSET != None:
- procs = [self.virtual_process_from_physical_offset(addr_space, self._config.OFFSET)]
- else:
- procs = self.filter_tasks(tasks.pslist(addr_space))
- for p in procs:
- # Build a dictionary for all three PEB lists where the
- # keys are base address and dll name are the values
- inloadorder = dict((mod.DllBase.v(), mod) for mod in list(self.list_load_modules(p)))
- ininitorder = dict((mod.DllBase.v(), mod) for mod in list(self.list_init_modules(p)))
- inmemorder = dict((mod.DllBase.v(), mod) for mod in list(self.list_mem_modules(p)))
- # For each of the memory mapped files, check if its unlinked
- mapped_files = self.list_mapped_files(p)
- for map in mapped_files.keys():
- try:
- load_path = inloadorder[map]
- except KeyError:
- load_path = None
- try:
- init_path = ininitorder[map]
- except KeyError:
- init_path = None
- try:
- mem_path = inmemorder[map]
- except KeyError:
- mem_path = None
- yield p, map, mapped_files[map][0], load_path, init_path, mem_path
- def render_text(self, outfd, data):
- outfd.write("{0:<8} {1:<20} {2:<12} {3:<8} {4:<8} {5:<8} {6}\n".format('Pid', 'Process', 'Base', 'InLoad', 'InInit', 'InMem', 'MappedPath'))
- for p, base, mapped_path, load_path, init_path, mem_path in data:
- inload = True if load_path != None else False
- ininit = True if init_path != None else False
- inmem = True if mem_path != None else False
- outfd.write("{0:<8} {1:<20} {2:<#12x} {3:<8} {4:<8} {5:<8} {6}\n".format(p.UniqueProcessId, p.ImageFileName, base, inload, ininit, inmem, mapped_path))
- if self._config.verbose:
- if load_path != None:
- outfd.write(" Load Path: {0} : {1}\n".format(load_path.FullDllName, load_path.BaseDllName))
- if init_path != None:
- outfd.write(" Init Path: {0} : {1}\n".format(init_path.FullDllName, init_path.BaseDllName))
- if mem_path != None:
- outfd.write(" Mem Path: {0} : {1}\n".format(mem_path.FullDllName, mem_path.BaseDllName))
- #--------------------------------------------------------------------------------
- # impscan plug-in
- #--------------------------------------------------------------------------------
- class ImpScan(Malfind):
- "[MALWARE] Scan a module for imports (API calls)"
- def __init__(self, config, *args):
- Malfind.__init__(self, config, *args)
- config.add_option('ADDR', short_option = 'a', default=None,
- help='Address to begin scanning (hex)',
- action='store', type='int')
- config.add_option('SIZE', short_option = 's', default=None,
- help='Size of memory to scan from base address',
- action='store', type='int')
- config.remove_option("OFFSET")
- config.remove_option("YARA-RULES")
- config.remove_option("YARA-RULES-ONLY")
- config.remove_option("KERNEL")
- def automate_ida(self, ofname, names, funcs, hooks):
- """This function creates IDC labels and executes IDA. Note: idal must be in your PATH."""
- # Build a full path to the IDC file (required by idag.exe on Windows)
- idcfile = os.path.join(os.getcwd(), ofname) + ".idc"
- print "Dumping IDC file to {0}".format(idcfile)
- f = open(idcfile, "w")
- if f == None:
- return
- # This is the IDC header
- f.write("#include <idc.idc>\nstatic main(void) {\n")
- # This is one line of IDC for each function
- if funcs != None:
- for addr in funcs:
- f.write(" MakeFunction(0x{0:08X}, BADADDR);\n".format(addr))
- # Write the IAT labels
- if names != None:
- for addr,func_name in names.iteritems():
- f.write(" MakeDword(0x{0:08X});\n".format(addr))
- f.write(" MakeName(0x{0:08X}, \"{1}\");\n".format(addr, func_name))
- # Write the hook labels (if any)
- if hooks != None:
- for addr,func_name in hooks.iteritems():
- f.write(" MakeName(0x{0:08X}, \"Hook{1}\");\n".format(addr, func_name))
- f.write(" MakeFunction(0x{0:08X}, BADADDR);\n".format(addr))
- # This is the IDC trailer
- f.write("Exit(0);}")
- f.close()
- # Set the tvision environment variable so no interaction is needed
- os.environ['TVHEADLESS'] = '1'
- # If we're on Windows, use idag.exe, otherwise use idal
- if sys.platform == "win32":
- ida = "idag"
- else:
- ida = "idal"
- try:
- proc = subprocess.Popen([ida, '-c', '-A', "-S{0}".format(idcfile), ofname], stdout=subprocess.PIPE)
- except OSError, e:
- print "Cannot launch IDA: " + str(e)
- print "Please make sure idal or idag.exe is in your PATH"
- else:
- print proc.communicate()[0]
- def revert_and_split(self, mod_name, func_name):
- """This function reverts forwarded imports to the original name"""
- forwarded_imports = {
- "ntdll.dll!RtlGetLastWin32Error" : "kernel32.dll!GetLastError",
- "ntdll.dll!RtlSetLastWin32Error" : "kernel32.dll!SetLastError",
- "ntdll.dll!RtlRestoreLastWin32Error" : "kernel32.dll!SetLastError",
- "ntdll.dll!RtlAllocateHeap" : "kernel32.dll!HeapAlloc",
- "ntdll.dll!RtlReAllocateHeap" : "kernel32.dll!HeapReAlloc",
- "ntdll.dll!RtlFreeHeap" : "kernel32.dll!HeapFree",
- "ntdll.dll!RtlEnterCriticalSection" : "kernel32.dll!EnterCriticalSection",
- "ntdll.dll!RtlLeaveCriticalSection" : "kernel32.dll!LeaveCriticalSection",
- "ntdll.dll!RtlDeleteCriticalSection" : "kernel32.dll!DeleteCriticalSection",
- "ntdll.dll!RtlZeroMemory" : "kernel32.dll!ZeroMemory",
- "ntdll.dll!RtlSizeHeap" : "kernel32.dll!HeapSize",
- "ntdll.dll!RtlUnwind" : "kernel32.dll!RtlUnwind"
- }
- fullname = "%s!%s" % (mod_name, func_name)
- if forwarded_imports.has_key(fullname):
- mod_name = forwarded_imports[fullname].split('!')[0]
- func_name = forwarded_imports[fullname].split('!')[1]
- return mod_name, func_name
- def call_scan(self, ps_ad, data, base):
- """Disassemble a block of data and yield possible calls to imported functions. We're
- looking for instructions such as these:
- CALL [0x1000400]
- JMP [0x1000400]
- The 0x1000400 address is an entry in the IAT. It stores a DWORD which is the location
- of the API function being called.
- """
- ps_ad_read = ps_ad.read
- data_len = len(data)
- for op in distorm3.Decompose(base, data, distorm3.Decode32Bits):
- if op.valid and ((op.flowControl == 'FC_CALL' and op.mnemonic == "CALL") or (op.flowControl == 'FC_UNC_BRANCH' and op.mnemonic == "JMP")) and op.operands[0].type == 'AbsoluteMemoryAddress':
- # Chop it to 32 bits, this will need changing for 64 bit support
- const = (op.operands[0].disp) & 0xFFFFFFFF
- # The memory location must be inside the range or its an error
- if (const < base) or (const > base + data_len):
- continue
- try:
- call_dest = struct.unpack("=I", ps_ad_read(const, 4))[0]
- except:
- continue
- yield op.address, const, call_dest
- def vicinity_scan(self, ps_ad, imports, exports, base, end_addr, forward=True):
- """For whatever reason, the generic call scanner above may not pick up all calls to
- imported functions. However, the IAT is usually grouped together. This function scans
- forward or backward from the lowest and highest known IAT entry to find other entries
- which are near by."""
- keys = imports.keys()
- if not keys:
- return
- keys.sort()
- # Scan forward from the lowest IAT entry or backward from the highest IAT entry
- starting_point = keys[0] if forward else keys[len(keys)-1]
- stopcount = 5
- i = 0
- while (stopcount > 0) and (i < 0x2000):
- if forward:
- const = starting_point + (i*4)
- else:
- const = starting_point - (i*4)
- call_dest = struct.unpack("=I", ps_ad.read(const, 4))[0]
- if (call_dest < base) or (call_dest > end_addr):
- if self.check_import(imports, exports, call_dest, const):
- stopcount = 5
- else:
- stopcount -= 1
- else:
- stopcount -= 1
- i += 1
- def check_import(self, imports, exports, call_dest, const):
- """Returns true if a given destination leads to an exported function"""
- # Find out if the destination leads to an API address
- if not exports.has_key(call_dest):
- return False
- # Revert any forwarded exports
- (mod_name, func_name) = exports[call_dest]
- (mod_name, func_name) = self.revert_and_split(mod_name, func_name)
- # Obviously some APIs are called from multiple locations in a binary
- # Here, we skip the duplicate calls (we only need 1 to build the IAT)
- if not imports.has_key(const):
- imports[const] = (mod_name, func_name, call_dest)
- return True
- return False
- def find_functions(self, data, addr):
- """Scan a given data buffer for function prologues"""
- instruction_sets = [
- "\x55\x89\xE5", # PUSH EBP; MOV EBP, ESP
- "\x55\x8B\xEC", # PUSH EBP; MOV EBP, ESP
- "\x8B\xFF\x55\x8B\xEC", # MOV EDI, EDI; PUSH EBP; MOV EBP, ESP
- ]
- i = 0
- found = []
- for sets in instruction_sets:
- while data.find(sets) != -1:
- offset = data.find(sets)
- address = (addr + offset + i) & 0xffffffff
- if address not in found:
- found.append(address)
- length = offset + len(sets)
- i += length
- data = data[length:]
- return found
- def get_all_imports(self, addr_space, data, base, exports):
- """Returns a dictionary where the keys are IAT entry addresses and the values
- are the names of the imported functions being called"""
- imports = {}
- for (_, const, call_dest) in self.call_scan(addr_space, data, base):
- self.check_import(imports, exports, call_dest, const)
- # Do the forward and reverse vicinity scans
- end_addr = base + len(data)
- self.vicinity_scan(addr_space, imports, exports, base, end_addr, True)
- self.vicinity_scan(addr_space, imports, exports, base, end_addr, False)
- return imports
- def rebase(self, addr_space, base, data):
- """Rebases a PE image by modifying the ImageBase field"""
- dos_header = obj.Object("_IMAGE_DOS_HEADER", offset = base, vm = addr_space)
- nt_header = dos_header.get_nt_header()
- optional_header_offset = nt_header.OptionalHeader.obj_offset - base
- img_base_offset = addr_space.profile.get_obj_offset('_IMAGE_OPTIONAL_HEADER', 'ImageBase')
- s = optional_header_offset + img_base_offset
- return data[0:s] + struct.pack("=I", base) + data[s+4:]
- def get_kernel_exports(self, addr_space, procs, mods):
- exports = dict()
- for mod in mods:
- space = find_space(addr_space, procs, mod.DllBase)
- if space == None:
- continue
- for exp in mod.exports(space):
- exports[exp.funcaddr] = (mod.BaseDllName, exp.get_name())
- return exports
- def get_process_exports(self, addr_space, mods):
- exports = dict()
- for mod in mods:
- for exp in mod.exports():
- exports[exp.funcaddr] = (mod.BaseDllName, exp.get_name())
- return exports
- def calculate(self):
- t0 = time.time()
- config = self._config
- if config.DUMP_DIR == None:
- debug.error("Please specify a dump directory (--dump-dir)")
- if not os.path.isdir(config.DUMP_DIR):
- debug.error(config.DUMP_DIR + " is not a directory")
- addr_space = get_malware_space(config)
- apis = dict()
- procs = list(tasks.pslist(addr_space))
- # If the user didn't specify a process with --pid, they must want kernel mode...
- if not config.PID:
- if config.ADDR == None:
- debug.error("Please specify a module address in hex (--addr)")
- base = config.ADDR
- # Get a list of loaded kernel modules
- drivers = list(modules.lsmod(addr_space))
- mods, mod_addrs = get_sorted_mods(drivers)
- # If the base maps to a loaded module, get the size from the LDR_MODULE.
- # Otherwise the user MUST provide the size or we wont' know when to stop.
- try:
- size = [mods[addr][0].SizeOfImage for addr in mod_addrs if mods[addr][0].DllBase.v() == base][0]
- except:
- if config.SIZE == None:
- debug.error("You must supply a size (--size)")
- else:
- size = config.SIZE
- # Find a space for the address
- space = find_space(addr_space, procs, base)
- if space == None:
- debug.error("The supplied address is not valid")
- # Read the data at the supplied address. If rebuild doesn't work, just zread.
- data = self.rebuild(space, base)
- if not data:
- data = space.zread(base, size)
- apis = self.get_kernel_exports(addr_space, procs, drivers)
- ofname = os.path.join(config.DUMP_DIR, "{0}.bin".format(base))
- addr_space = space
- else:
- # Currently we only support scanning one process at a time
- p = self.filter_tasks(tasks.pslist(addr_space))[0]
- # Discontinue if the process address space isn't valid
- ps_ad = p.get_process_address_space()
- if ps_ad == None:
- debug.error("The process address space is invalid")
- if not p.Peb:
- debug.error("The PEB is not accessible")
- # Get the loaded DLLs into a list
- mods = list(p.list_modules())
- # Now figure out the range of memory to scan for imports
- # This may be specified by the user with --ADDR and --SIZE or
- # alternately those opts can be left off, in which case we'll
- # scan the main module (*.exe) for imports
- if config.ADDR != None:
- ret = get_mem_size(p, config.ADDR)
- if not isinstance(ret, tuple):
- debug.error("Cannot find the desired memory range")
- base, size = ret
- # If the user didn't specify an address, they must want to analyze the main EXE
- else:
- if len(mods) == 0:
- debug.error("Cannot read DLL list")
- base = mods[0].DllBase
- size = mods[0].SizeOfImage
- if not ps_ad.is_valid_address(base):
- debug.error("The desired base address {0:#x} is invalid!".format(base))
- # Can't continue if the data isn't available
- data = get_vad_data(ps_ad, base, base+size)
- if len(data) == 0:
- debug.error("Cannot acquire the data!")
- apis = self.get_process_exports(ps_ad, mods)
- ofname = os.path.join(config.DUMP_DIR, "{0}-{1:x}-{2:x}.bin".format(p.UniqueProcessId, base, base+size))
- addr_space = ps_ad
- imports = self.get_all_imports(addr_space, data, base, apis)
- # Build the data for IDC statements
- funcs = self.find_functions(data, base)
- # Build a dictionary of imports for IDC statements
- names = dict((const, vals[1]) for (const, vals) in imports.items())
- for const, vals in imports.items():
- (mod_name, func_name, call_dest) = vals
- print "%x" % const, mod_name, func_name, "%x" % call_dest
- # If there's a PE at the base address, fix its PE header
- if data[0:2] == 'MZ':
- data = self.rebuild(addr_space, base)
- data = self.rebase(addr_space, base, data)
- f = open(ofname, "wb")
- f.write(data)
- f.close()
- self.automate_ida(ofname, names, funcs, None)
- print 'Finished after', time.time() - t0, 'seconds'
- def render_text(self, outfd, data):
- pass
- #--------------------------------------------------------------------------------
- # apihooks plug-in
- #--------------------------------------------------------------------------------
- # The values of each dictionary item is a list of tuples which are regexes
- # in the format (process, srd_mod, dst_mod, function). If you specify
- # (".*", ".*", ".*", ".*") then you essentially whitelist all possible hooks
- # of the given type.
- ignore_rules = dict(
- # Usermode IAT hook rules
- iat = [
- # Ignore hooks that point inside C runtime libraries
- (".*", ".*", "(msvcr|msvcp).+\.dll", ".*"),
- # Ignore hooks of WMI that point inside advapi32.dll
- (".*", "wmi.dll", "advapi32.dll", ".*"),
- # Ignore hooks of winsock that point inside ws2 and mswsock
- (".*", "WSOCK32.dll", "(WS2_32|MSWSOCK)\.dll", ".*"),
- # Ignore hooks of SCHANNEL* that point inside secur32.dll
- (".*", "schannel.dll", "secur32.dll", ".*"),
- # Ignore hooks that point inside known modules
- (".*", ".*", "(kernel32|ntdll|shimeng|kernelbase|shlwapi)", ".*"),
- # Handle some known forwarded imports
- (".*", ".*", ".*", "((Enter|Delete|Leave)CriticalSection|(Get|Set)LastError|Heap(ReAlloc|Free|Size|Alloc)|Rtl(Unwind|MoveMemory))"),
- ],
- # Usermode EAT hook rules
- eat = [
- # These modules have so many hooks its really not useful to check
- (".*", "(msvcp|msvcr|mfc|wbemcomn|fastprox)", ".*", ".*"),
- ],
- # Usermode Inline hook rules
- inline = [
- # Ignore hooks in the pywin32 service process
- ("pythonservice", ".*", ".*", ".*"),
- # Many legit hooks land inside these modules
- (".*", ".*", "(msvcr|wbemcomn|ntdll|kernel32|ole32|shlwapi|user32|gdi32|ws2_32|shell32)", ".*"),
- # Ignore hooks of the msvcrt DLLs
- (".*", "(msvc(p|r)\d{2}|mfc\d{2})\.dll", ".*", ".*"),
- # Ignore hooks of MD5Final, MD5Init, MD5Update that point inside advapi32
- (".*", ".*", "advapi32.dll", "MD5.+"),
- # Ignore hooks of common firefox components
- ("firefox\.exe", ".*", "(xul|mozcrt|nspr4)", ".*"),
- # Ignore hooks created by Parallels VM software
- (".*", "user32.dll", "prl_hook.dll", ".*"),
- # Ignore DLL registration functions
- (".*", ".*", ".*", "(DllCanUnloadNow|DllRegisterServer|DllUnregisterServer)"),
- ],
- # Kernel mode IAT hook rules
- iatk = [
- (".*", ".*", "(win32k\.sys|hal\.dll|dump_wmilib\.sys|ntkrnlpa\.exe|ntoskrnl\.exe)", ".*"),
- # Ignore hooks of the SCSI module which point inside the dump_scsiport module
- (".*", "scsiport\.sys", "dump_scsiport\.sys", ".*"),
- ],
- # Kernel mode EAT hook rules
- eatk = [
- ],
- # Kernel mode Inline hook rules
- inlinek = [
- # Ignore kernel hooks that point inside these modules
- (".*", ".*", "(hal.dll|ndis.sys|ntkrnlpa.exe|ntoskrnl.exe)", ".*"),
- ],
- )
- class ApiHooks(procdump.ProcExeDump):
- "[MALWARE] Find API hooks"
- def __init__(self, config, *args):
- procdump.ProcExeDump.__init__(self, config, *args)
- config.remove_option("DUMP-DIR")
- config.remove_option("UNSAFE")
- config.add_option("KERNEL", short_option='K', default=False,
- action='store_true', help='Scan kernel modules')
- # Whitelist to ignore API hooks
- self.compiled_rules = self.compile()
- # Container for driver objects
- self.driver_objects = list()
- def compile(self):
- """By precompiling the regular expression rules, we save a lot of time"""
- compiled_rules = dict()
- for key, rules in ignore_rules.items():
- for rule in rules:
- ruleset = ((re.compile(rule[0], re.I), re.compile(rule[1], re.I), re.compile(rule[2], re.I), re.compile(rule[3], re.I)))
- if compiled_rules.has_key(key):
- compiled_rules[key].append(ruleset)
- else:
- compiled_rules[key] = [ruleset]
- return compiled_rules
- def ignore(self, rule_key, process, src_mod, dst_mod, function):
- """Check if an API hook should be ignored or not, based on the precompiled regexes"""
- for rule in self.compiled_rules[rule_key]:
- if rule[0].search(process) != None and rule[1].search(src_mod) != None and rule[2].search(dst_mod) != None and rule[3].search(function) != None:
- return True
- return False
- def get_section_from_mod(self, addr_space, mod, addr):
- """Return the name of the PE section in which a given VA resides"""
- dos_header = obj.Object("_IMAGE_DOS_HEADER", offset = mod.DllBase, vm = addr_space)
- nt_header = dos_header.get_nt_header()
- for sect in nt_header.get_sections(False):
- start_va = mod.DllBase + sect.VirtualAddress
- if addr > start_va and addr < sect.Misc.VirtualSize + start_va:
- return str(sect.Name)
- return ''
- def get_ucp_hooks(self, mods, mod_addrs, mod, src_mod_name, space):
- """Scan for calls to unknown code pages"""
- # Abort if we can't get the NT header
- dos_header = obj.Object("_IMAGE_DOS_HEADER", offset = mod.DllBase, vm = space)
- nt_header = dos_header.get_nt_header()
- if nt_header == None:
- raise StopIteration
- # Parse the PE sections for this driver
- for sec in nt_header.get_sections(False):
- # Only check executable sections
- if sec.Characteristics & 0x20000000:
- # Calculate the virtual address of this PE section in memory
- sec_va = mod.DllBase + sec.VirtualAddress
- # Extract the section's data and make sure we get it all
- data = space.zread(sec_va, sec.Misc.VirtualSize)
- if len(data) != sec.Misc.VirtualSize:
- continue
- # Disassemble instructions in the section
- for op in distorm3.Decompose(sec_va, data, distorm3.Decode32Bits):
- if op.valid and ((op.flowControl == 'FC_CALL' and op.mnemonic == "CALL") or (op.flowControl == 'FC_UNC_BRANCH' and op.mnemonic == "JMP")) and op.operands[0].type == 'AbsoluteMemoryAddress':
- # This is ADDR, which is the IAT location
- const = op.operands[0].disp & 0xFFFFFFFF
- # Abort if ADDR is not a valid address
- if not space.is_valid_address(const):
- continue
- # This is what [ADDR] points to - the absolute call destination
- call_dest = struct.unpack("=I", space.read(const, 4))[0]
- # Abort if [ADDR] is not a valid address
- if not space.is_valid_address(call_dest):
- continue
- # If ADDR or [ADDR] point to an unknown code page, then we've likely found a hook
- if find_module(mods, mod_addrs, const) == None or \
- find_module(mods, mod_addrs, call_dest) == None:
- # Disassemble the instruction which calls the hook
- s = "{0} [{1:#x}]".format(op.mnemonic, call_dest)
- # Disassemble the data at the call destination (the rootkit code)
- call_data = space.zread(call_dest, 32)
- dest_disasm = get_disasm_text(call_data, call_dest)
- yield (None, 'ucpcall', None, src_mod_name, '', op.address, call_dest, None, s)
- def get_hooks(self, proc, addr_space, mods, mod_addrs, mod, src_mod_name):
- """Find IAT, EAT and Inline hooks"""
- # Declare this variable so we don't constantly look it up
- addr_space_is_valid_address = addr_space.is_valid_address
- # If the process object is None, then we're working in kernel mode
- if proc != None:
- process_name = str(proc.ImageFileName).lower()
- eat_hook_type = 'eat'
- iat_hook_type = 'iat'
- inline_hook_type = 'inline'
- else:
- process_name = ''
- eat_hook_type = 'eatk'
- iat_hook_type = 'iatk'
- inline_hook_type = 'inlinek'
- modlist = [m for k, (m, v) in mods.items()]
- for dll, symbols in mod.imports():
- # Get a list of the DLLs in the process space that match the name of the imported DLL.
- # This is important, for example, because a process might load comctl32.dll from both
- # the system32 directory and the WinSxS directory for app compatibility
- possible_mods = find_module_by_name_ex(modlist, [dll])
- if not possible_mods:
- continue
- for imp in symbols:
- if not addr_space_is_valid_address(imp.funcaddr):
- continue
- dst_mod = find_module(mods, mod_addrs, imp.funcaddr)
- if dst_mod not in possible_mods:
- try:
- dst_mod_name = str(dst_mod.BaseDllName)
- except:
- dst_mod_name = "UNKNOWN"
- if self.ignore(iat_hook_type, process_name, dll, dst_mod_name, imp.get_name()):
- continue
- yield (proc, iat_hook_type, src_mod_name, dll, imp.get_name(), 0, imp.funcaddr, dst_mod_name, None)
- # Check this whitelist before scanning, it can save some time
- if self.ignore(inline_hook_type, "", src_mod_name, "", ""):
- raise StopIteration
- for exp in mod.exports():
- if not addr_space_is_valid_address(exp.funcaddr):
- continue
- # Get the module containing the function
- dst_mod = find_module(mods, mod_addrs, exp.funcaddr)
- # This is a check for EAT hooks
- if dst_mod != exp.owner_mod:
- try:
- dst_mod_name = str(dst_mod.BaseDllName)
- except:
- dst_mod_name = "UNKNOWN"
- yield (proc, eat_hook_type, None, src_mod_name, exp.get_name() + '[' + hex(exp.funcaddr) + ']', 0, exp.funcaddr, dst_mod_name, None)
- # No need to check for inline hooks in the case of EAT hooks
- continue
- # Check for inline hooks
- ret = self.check_inline(exp.funcaddr, addr_space, dst_mod.DllBase, dst_mod.DllBase + dst_mod.SizeOfImage)
- if not isinstance(ret, tuple):
- continue
- (dst, instr) = ret
- # The hook destination must exist in valid memory for the process
- if not addr_space_is_valid_address(dst):
- continue
- dst_mod = find_module(mods, mod_addrs, dst)
- if dst_mod != exp.owner_mod:
- try:
- dst_mod_name = str(dst_mod.BaseDllName)
- except:
- dst_mod_name = "UNKNOWN"
- if self.ignore(inline_hook_type, process_name, src_mod_name, dst_mod_name, exp.get_name()):
- continue
- #ntoskrnl exports symbols such as SePublicDefaultDacl, SeTokenObjectType that
- #are not functions. in some cases, the symbols are DWORDs that store addresses
- #like 823C45E8 (where E8 is a CALL) and it looks like redirection according to the
- #disasm. we could create a whitelist entry with the names of the symbols, but there
- # may be other symbols in the future, so instead we check the section where the
- #exported symbol exists - if its .text the export is probably a function. if
- #its .data or PAGEDATA the export is probably a variable...
- if proc == None and "data" in self.get_section_from_mod(addr_space, mod, exp.funcaddr).lower():
- continue
- yield (proc, inline_hook_type, None, src_mod_name, exp.get_name() + '[' + hex(exp.funcaddr) + ']', exp.funcaddr, dst, dst_mod_name, instr)
- def check_inline(self, va, ps_ad, mem_start=None, mem_end=None):
- """Check for inline hooks with static disassembly. The mem_start and mem_end
- parameters are placeholders for different versions of ApiHooks which use other
- techniques to detect hooks - they are not used in this version"""
- try:
- bytes = ps_ad.zread(va, 24)
- except:
- return None
- STOP = lambda x: x < mem_start or x > mem_start + mem_end
- # If the API is in kernel memory, then the hook destination should also
- # be in kernel memory (and same with user mode) or maybe there's a disasm
- # error or other corruption happening....
- kernel_memory = True if va > 0x80000000 else False
- n = 0
- call_dest = mnemonic = push_val = None
- for op in distorm3.Decompose(va, bytes, distorm3.Decode32Bits):
- # Increasing n can lead to false positives as we scan further into
- # a function. However, it is necessary to detect some hooks. The
- # alternate option is to use emulation...
- if not op.valid or n == 3:
- break
- if op.flowControl == 'FC_CALL':
- # Check for CALL [ADDR]
- if op.mnemonic == "CALL" and op.operands[0].type == 'AbsoluteMemoryAddress':
- const = op.operands[0].disp & 0xFFFFFFFF
- call_dest = struct.unpack("=I", ps_ad.zread(const, 4))[0]
- mnemonic = 'CALL [{0:#x}] =>> {1:#x}'.format(const, call_dest)
- if STOP(call_dest): break
- # Check for CALL ADDR
- elif op.operands[0].type == 'Immediate':
- call_dest = op.operands[0].value & 0xFFFFFFFF
- mnemonic = 'CALL {0:#x}'.format(call_dest)
- if STOP(call_dest): break
- elif op.flowControl == 'FC_UNC_BRANCH' and op.mnemonic == "JMP" and op.size > 2:
- # Check for JMP [ADDR]
- if op.operands[0].type == 'AbsoluteMemoryAddress':
- const = op.operands[0].disp & 0xFFFFFFFF
- call_dest = struct.unpack("=I", ps_ad.zread(const, 4))[0]
- mnemonic = 'JMP [{0:#x}] =>> {1:#x}'.format(const, call_dest)
- if STOP(call_dest): break
- # Check for JMP ADDR
- elif op.operands[0].type == 'Immediate':
- call_dest = op.operands[0].value & 0xFFFFFFFF
- mnemonic = 'JMP {0:#x}'.format(call_dest)
- if STOP(call_dest): break
- # Check for PUSH followed by a RET
- elif op.flowControl == 'FC_NONE':
- if op.mnemonic == "PUSH" and op.operands[0].type == 'Immediate' and op.size == 5:
- push_val = op.operands[0].value & 0xFFFFFFFF
- elif op.flowControl == 'FC_RET':
- if push_val:
- call_dest = push_val
- mnemonic = "PUSH {0:#x}; {1}".format(call_dest, op.mnemonic)
- if STOP(call_dest): break
- n += 1
- # Check that we're still in the correct ring that we started in
- if kernel_memory and call_dest < 0x80000000 or not \
- kernel_memory and call_dest > 0x80000000:
- return None
- return call_dest, mnemonic
- def get_syscall_hooks(self, proc, addr_space, mods, mod_addrs, mod):
- """Enumerate syscall hooks, as installed by Carberp etc."""
- # Enumerate exported functions by this module
- exports = list(mod.exports())
- # Resolve KiFastSystemCall API
- try:
- kifastsystemcall = [exp.funcaddr for exp in exports if exp.get_name() == "KiFastSystemCall"][0]
- except IndexError:
- raise StopIteration("Cannot find the address of KiFastSystemCall")
- addr_space_is_valid_address = addr_space.is_valid_address
- # Check each exported function if its an NT syscall
- for exp in exports:
- if not addr_space_is_valid_address(exp.funcaddr):
- continue
- # Disassemble the first two instructions
- try:
- i1 = exp.get_instruction(0)
- i2 = exp.get_instruction(1)
- except:
- continue
- # They both must be valid and have two operands
- if not i1 or not i1.valid or len(i1.operands) != 2 or \
- not i2 or not i2.valid or len(i2.operands) != 2:
- continue
- # Now check the instruction and operand types
- if i1.mnemonic == "MOV" and i1.operands[0].type == 'Register' and i1.operands[0].name == 'EAX' and i1.operands[1].type == 'Immediate' and \
- i2.mnemonic == "MOV" and i2.operands[0].type == 'Register' and i2.operands[0].name == 'EDX' and i1.operands[1].type == 'Immediate':
- # This is a pointer to where KiFastSystemCall is stored
- call_dest = i2.operands[1].value
- # Cheap disassembly of the instruction
- str2 = "MOV EDX, {0:#x}".format(call_dest)
- # Derefrence to find the KiFastSystemCall address
- deref_pointer = obj.Object('unsigned long', offset = call_dest, vm = addr_space)
- if deref_pointer != kifastsystemcall:
- yield (proc, 'syscall', None, 'ntdll.dll', exp.get_name() + '[' + hex(exp.funcaddr) + ']', call_dest, deref_pointer, "UNKNOWN", str2)
- def get_all_hooks(self, proc, addr_space, procs, mods, mod_addrs):
- """Dispatcher for hook detection"""
- for base, (mod, name) in mods.items():
- # If the process object is None, then we're dealing with kernel modules. In this case,
- # we need to find a valid space, because some modules are only accessible from the
- # context of a process with at least one GUI thread.
- if proc == None:
- space = find_space(addr_space, procs, base)
- else:
- space = addr_space
- if space == None or not space.vtop(base):
- continue
- for val in self.get_hooks(proc, space, mods, mod_addrs, mod, name):
- yield val
- # Check for NT syscall hooks if the module is ntdll
- if name == "ntdll.dll":
- for val in self.get_syscall_hooks(proc, space, mods, mod_addrs, mod):
- yield val
- # For the sake of time, we only check the modules in this list for calls to
- # unknown code pages. Expand it however you please.
- if name in ["tcpip.sys", "ntfs.sys", "fastfast.sys", "wanarp.sys", "ndis.sys", "atapi.sys", "ntoskrnl.exe", "ntkrnlpa.exe", "ntkrnlmp.exe"]:
- for val in self.get_ucp_hooks(mods, mod_addrs, mod, name, space):
- yield val
- def get_name_from_driver_object(self, addr):
- """Look up the name of a driver based on its DRIVER_OBJECT - this is useful in cases
- like Rustock.B which unlink and overwrite data in the LDR_MODULES list"""
- myobj = filescan.DriverScan(self._config)
- myobj.kernel_address_space = utils.load_as(self._config)
- if not self.driver_objects:
- self.driver_objects = list(myobj.calculate())
- for object_obj, driver_obj, extension_obj, object_name_info_obj in self.driver_objects:
- if addr >= driver_obj.DriverStart and addr < (driver_obj.DriverStart + driver_obj.DriverSize) & 0xFFFFFFFF:
- return myobj.parse_string(driver_obj.DriverName)
- return 'UNKNOWN'
- def calculate(self):
- t0 = time.time()
- addr_space = get_malware_space(self._config)
- procs = list(tasks.pslist(addr_space))
- # Perform a user space scan
- if not self._config.KERNEL:
- # Scan all processes, unless the user specified a PID list
- for p in self.filter_tasks(procs):
- ps_ad = p.get_process_address_space()
- if ps_ad == None:
- continue
- # The PEB must be valid to get a list of loaded DLLs
- if not p.Peb:
- continue
- mods, mod_addrs = get_sorted_mods(p.list_modules())
- for val in self.get_all_hooks(p, ps_ad, procs, mods, mod_addrs):
- yield val
- # Perform a kernel space scan
- else:
- mods, mod_addrs = get_sorted_mods(modules.lsmod(addr_space))
- for val in self.get_all_hooks(None, addr_space, procs, mods, mod_addrs):
- (proc, type, current_mod, mod, func, src, dst, hooker, instruction) = val
- if hooker == "UNKNOWN":
- hooker = self.get_name_from_driver_object(dst)
- yield (proc, type, current_mod, mod, func, src, dst, hooker, instruction)
- print 'Finished after', time.time() - t0, 'seconds'
- def render_text(self, outfd, data):
- outfd.write("{0:<32} {1:<8} {2:40} {3}\n".format('Name', 'Type', 'Target', 'Value'))
- for (proc, type, current_mod, mod, func, src, dst, hooker, instruction) in data:
- if proc:
- name = "{0}[{1}]".format(proc.ImageFileName, proc.UniqueProcessId)
- if current_mod:
- name += '@' + current_mod
- else:
- name = '-'
- value = "{0:#x}".format(src)
- if instruction:
- value += " {0}".format(instruction)
- else:
- value += " {0:#x}".format(dst)
- if hooker:
- value += " ({0})".format(hooker)
- target = mod
- if func:
- target += "!" + func
- outfd.write("{0:<32} {1:<8} {2:<40} {3}\n".format(name, type, target, value))
- #--------------------------------------------------------------------------------
- # idt plug-in
- # IDT info from http://www.logix.cz/michal/doc/i386/chp09-00.htm & Rootkit Arsenal
- #--------------------------------------------------------------------------------
- idt_info = {
- 0x0: ('KiTrap00', 'Divide error'),
- 0x1: ('KiTrap01', 'Debug'),
- 0x2: ('KiTrap02', 'Nonmaskable interrupt'),
- 0x3: ('KiTrap03', 'Breakpoint'),
- 0x4: ('KiTrap04', 'Overflow'),
- 0x5: ('KiTrap05', 'Bounds check'),
- 0x6: ('KiTrap06', 'Invalid opcode'),
- 0x7: ('KiTrap07', 'Coprocessor not avail.'),
- 0x8: ('KiTrap08', 'Double fault'),
- 0x9: ('KiTrap09', ''),
- 0xA: ('KiTrap0A', 'Invalid TSS'),
- 0xB: ('KiTrap0B', 'Segment not present'),
- 0xC: ('KiTrap0C', 'Stack exception'),
- 0xD: ('KiTrap0D', 'General protection'),
- 0xE: ('KiTrap0E', 'Page fault'),
- 0xF: ('KiTrap0F', ''),
- 0x10: ('KiTrap10', ''),
- 0x11: ('KiTrap11', ''),
- 0x12: ('KiTrap12', ''),
- 0x13: ('KiTrap13', ''),
- 0x2A: ('KiGetTickCount', ''),
- 0x2B: ('KiCallbackReturn', ''),
- 0x2C: ('KiSetLowWaitHighThread', ''),
- 0x2D: ('KiDebugService', ''),
- 0x2E: ('KiSystemService', ''),
- }
- class IDT(ApiHooks):
- "[MALWARE] Display Interrupt Descriptor Table"
- def __init__(self, config, *args):
- ApiHooks.__init__(self, config, *args)
- config.remove_option("PID")
- config.remove_option("KERNEL")
- config.remove_option("OFFSET")
- def calculate(self):
- addr_space = get_malware_space(self._config)
- mods, mod_addrs = get_sorted_mods(modules.lsmod(addr_space))
- volmagic = obj.Object('VOLATILITY_MAGIC', 0x0, addr_space)
- kpcra = volmagic.KPCR.v()
- kpcrval = obj.Object("_KPCR", offset = kpcra, vm = addr_space)
- entries = obj.Object("Array", offset = kpcrval.IDT, vm = addr_space, count = 256, targetType = '_KIDTENTRY')
- for i, entry in enumerate(entries):
- if idt_info.has_key(i):
- func_name = idt_info[i][0]
- elif i >= 0x30:
- func_name = 'KiUnexpectedInterrupt{0}'.format(i-0x30)
- else:
- func_name = '-'
- details = ''
- if entry.ExtendedOffset == 0:
- addr = 0
- else:
- addr = (entry.ExtendedOffset.v() << 16) | entry.Offset.v()
- mod = find_module(mods, mod_addrs, addr)
- if mod != None:
- secname = self.get_section_from_mod(addr_space, mod, addr)
- details = "{0} {1}".format(mod.BaseDllName, secname)
- ret = self.check_inline(addr, addr_space, mod.DllBase, mod.DllBase + mod.SizeOfImage)
- if isinstance(ret, tuple):
- details += ' => ' + ret[1]
- yield i, entry.Selector, func_name, addr, details
- def render_text(self, outfd, data):
- outfd.write("{0:<8} {1:<8} {2:<26} {3:<12} {4}\n".format("Index", "Selector", "Function", "Value", "Details"))
- for i, selector, func_name, addr, details in data:
- outfd.write("{0:<8X} {1:<8X} {2:<26} {3:<#12x} {4}\n".format(i, selector, func_name, addr, details))
- #--------------------------------------------------------------------------------
- # gdt plugin
- #
- # This plugin was built using information in Reverend Bill Blunden's book
- # "The Rootkit Arsenal" and Matthew "j00ru" Jurczyk and Gynvael Coldwind's article
- # "GDT and LDT in Windows kernel vulnerability exploitation" which can be found
- # at http://dl.packetstormsecurity.net/papers/attack/call_gate_exploitation.pdf.
- #--------------------------------------------------------------------------------
- type = [
- "Data RO",
- "Data RO Ac",
- "Data RW",
- "Data RW Ac",
- "Data RO E",
- "Data RO EA",
- "Data RW E",
- "Data RW EA",
- "Code EO",
- "Code EO Ac",
- "Code RE",
- "Code RE Ac",
- "Code EO C",
- "Code EO CA",
- "Code RE C",
- "Code RE CA",
- "<Reserved>",
- "TSS16 Avl",
- "LDT",
- "TSS16 Busy",
- "CallGate16",
- "TaskGate",
- "Int Gate16",
- "TrapGate16",
- "<Reserved>",
- "TSS32 Avl",
- "<Reserved>",
- "TSS32 Busy",
- "CallGate32",
- "<Reserved>",
- "Int Gate32",
- "TrapGate32",
- ]
- class GDT(commands.command):
- "[MALWARE] Display Global Descriptor Table"
- def __init__(self, config, *args):
- commands.command.__init__(self, config, *args)
- def calculate(self):
- addr_space = get_malware_space(self._config)
- volmagic = obj.Object('VOLATILITY_MAGIC', 0x0, addr_space)
- kpcrval = obj.Object("_KPCR", offset = volmagic.KPCR.v(), vm = addr_space)
- # Since the real GDT size is read from a register, we'll just assume
- # that there are 128 entries (which is normal for most OS)
- segdesc = obj.Object('Array', targetType = '_SEG_DESCRIPTOR', count = 128, offset = kpcrval.GDT, vm = addr_space)
- for i, sd in enumerate(segdesc):
- yield i, sd
- def render_text(self, outfd, data):
- outfd.write("{0:<6} {1:<12} {2:<12} {3:<14} {4:<6} {5:6} {6:6}\n".format(
- "Sel", "Base", "Limit", "Type", "DPL", "Gr", "Pr", "Flag"))
- for i, sd in data:
- # Do this for seg descriptors *and* call gates
- index = sd.type + 16 if sd.sFlag == 0 else sd.type
- present = "P" if sd.pFlag == 1 else "Np"
- # Do this only for 32-bit call gates
- if type[index] == "CallGate32":
- cg = obj.Object('_CALL_GATE_DESCRIPTOR', sd.obj_offset, sd.obj_vm)
- calladdr = cg.offset1 + (cg.offset2 << 16)
- outfd.write("{0:<#6x} {1:<#12x} {2:<12} {3:<14} {4:<6} {5:<6} {6:6}\n".format(
- i*8, calladdr, '-', type[index], sd.dpl, '-', present))
- if calladdr > 0x80000000 and cg.obj_vm.is_valid_address(calladdr):
- try:
- disasm = get_disasm_text(cg.obj_vm.zread(calladdr, 12), calladdr, stoponret=True)
- except:
- disasm = None
- if disasm:
- outfd.write("\n{0}\n".format(disasm))
- # Do this for all other types of seg descriptors
- else:
- granularity = "Pg" if sd.gFlag == 1 else "By"
- baseaddress = sd.baseaddress1 + ((sd.baseaddress2 + (sd.baseaddress3 << 8)) << 16)
- limit = sd.size1 + (sd.size2 << 16)
- if sd.gFlag == 1:
- limit = (limit + 1) * 0x1000
- limit -= 1
- outfd.write("{0:<#6x} {1:<#12x} {2:<#12x} {3:<14} {4:<6} {5:<6} {6:6}\n".format(
- i*8, baseaddress, limit, type[index], sd.dpl, granularity, present))
- #--------------------------------------------------------------------------------
- # callbacks plug-in
- #--------------------------------------------------------------------------------
- class PoolScanFSCallback(scan.PoolScanner):
- """PoolScanner for File System Callbacks"""
- checks = [ ('PoolTagCheck', dict(tag = "IoFs")),
- ('CheckPoolSize', dict(condition = lambda x: x == 0x18)),
- ('CheckPoolType', dict(non_paged = True, paged = True, free = True)),
- #('CheckPoolIndex', dict(value = 4)),
- ]
- class PoolScanShutdownCallback(scan.PoolScanner):
- """PoolScanner for Shutdown Callbacks"""
- checks = [ ('PoolTagCheck', dict(tag = "IoSh")),
- ('CheckPoolSize', dict(condition = lambda x: x == 0x18)),
- ('CheckPoolType', dict(non_paged = True, paged = True, free = True)),
- ('CheckPoolIndex', dict(value = 0)),
- ]
- class PoolScanGenericCallback(scan.PoolScanner):
- """PoolScanner for Generic Callbacks"""
- checks = [ ('PoolTagCheck', dict(tag = "Cbrb")),
- ('CheckPoolSize', dict(condition = lambda x: x == 0x18)),
- ('CheckPoolType', dict(non_paged = True, paged = True, free = True)),
- # This is a good constraint for all images except Frank's rustock-c.vmem
- #('CheckPoolIndex', dict(value = 1)),
- ]
- class PoolScanDbgPrintCallback(scan.PoolScanner):
- """PoolScanner for DebugPrint Callbacks on Vista and 7"""
- checks = [ ('PoolTagCheck', dict(tag = "DbCb")),
- ('CheckPoolSize', dict(condition = lambda x: x == 0x20)),
- ('CheckPoolType', dict(non_paged = True, paged = True, free = True)),
- #('CheckPoolIndex', dict(value = 0)),
- ]
- class PoolScanRegistryCallback(scan.PoolScanner):
- """PoolScanner for DebugPrint Callbacks on Vista and 7"""
- checks = [ ('PoolTagCheck', dict(tag = "CMcb")),
- # Seen as 0x38 on Vista SP2 and 0x30 on 7 SP0
- ('CheckPoolSize', dict(condition = lambda x: x >= 0x38)),
- ('CheckPoolType', dict(non_paged = True, paged = True, free = True)),
- ('CheckPoolIndex', dict(value = 4)),
- ]
- class Callbacks(ApiHooks):
- "[MALWARE] Print system-wide notification routines"
- def __init__(self, config, *args):
- ApiHooks.__init__(self, config, *args)
- config.remove_option("PID")
- config.remove_option("KERNEL")
- config.remove_option("OFFSET")
- def get_kernel_callbacks(self, addr_space, ntmod):
- """
- Enumerate the Create Process, Create Thread, and Image Load callbacks.
- On some systems, the byte sequences will be inaccurate or the exported
- function will not be found. In these cases, the PoolScanGenericCallback
- scanner will pick up the pool associated with the callbacks.
- """
- routines = [
- # push esi; mov esi, offset _PspLoadImageNotifyRoutine
- ('PsSetLoadImageNotifyRoutine', "\x56\xbe"),
- # push esi; mov esi, offset _PspCreateThreadNotifyRoutine
- ('PsSetCreateThreadNotifyRoutine', "\x56\xbe"),
- # mov edi, offset _PspCreateProcessNotifyRoutine
- ('PsSetCreateProcessNotifyRoutine',"\xbf"),
- ]
- for info in routines:
- symbol, hexbytes = info
- # Locate the exported symbol in the NT module
- symaddr = ntmod.getprocaddress(symbol)
- if symaddr == None: continue
- # Find the global variable referenced by the exported symbol
- data = addr_space.zread(symaddr, 100)
- offset = data.find(hexbytes)
- if offset == -1: continue
- array_base = obj.Object('unsigned int', offset = symaddr + offset + len(hexbytes), vm = addr_space)
- addrs = obj.Object('Array', offset = array_base, vm = addr_space, targetType = 'unsigned long', count = 8)
- for addr in addrs:
- if addr == 0: continue
- callback = obj.Object('_GENERIC_CALLBACK', offset = addr - 0x7, vm = addr_space)
- yield symbol, callback.Callback, None
- def get_generic_callbacks(self, addr_space):
- """
- Enumerate generic callbacks of the following types:
- * PsSetCreateProcessNotifyRoutine
- * PsSetThreadCreateNotifyRoutine
- * PsSetLoadImageNotifyRoutine
- * CmRegisterCallback (on XP only)
- * DbgkLkmdRegisterCallback (on Windows 7 only)
- The only issue is that you can't distinguish between the types by just
- finding the generic callback structure - but its better than nothing ;-)
- """
- for offset in PoolScanGenericCallback().scan(self.pspace):
- callback = obj.Object('_GENERIC_CALLBACK', offset, self.pspace)
- #if callback.Associated != 0:
- # print callback.Associated.dereference_as('_REGISTRY_CALLBACK_LEGACY').CreateTime
- # component = callback.Associated.dereference_as('_REGISTRY_CALLBACK').CreateTime
- #else:
- component = None
- yield "GenericKernelCallback", callback.Callback, component
- def get_fs_callbacks(self):
- """Enumerate the File System change callbacks"""
- for offset in PoolScanFSCallback().scan(self.pspace):
- callback = obj.Object('_NOTIFICATION_PACKET', offset, self.pspace)
- yield "IoRegisterFsRegistrationChange", callback.NotificationRoutine, None
- def get_bugcheck_callbacks(self, addr_space):
- """
- Enumerate generic Bugcheck callbacks.
- Note: These structures don't exist in tagged pools, but you can find
- them via KDDEBUGGER_DATA64 on all versions of Windows.
- """
- KeBugCheckCallbackListHead = tasks.get_kdbg(addr_space).KeBugCheckCallbackListHead.dereference_as('_KBUGCHECK_CALLBACK_RECORD')
- for l in KeBugCheckCallbackListHead.Entry.list_of_type("_KBUGCHECK_CALLBACK_RECORD", "Entry"):
- component = read_asciiz(addr_space, l.Component)
- yield "KeBugCheckCallbackListHead", l.CallbackRoutine, component
- def get_bugcheck_reason_callbacks(self, addr_space, ntmod):
- """
- Enumerate Bugcheck Reason callbacks.
- Note: These structures don't exist in tagged pools, so we find them by
- locating the list head which is a non-exported NT symbol. The method works
- on all versions of Windows.
- """
- symbol = "KeRegisterBugCheckReasonCallback"
- # mov [eax+KBUGCHECK_REASON_CALLBACK_RECORD.Entry.Blink], offset _KeBugCheckReasonCallbackListHead
- hexbytes = "\xC7\x40\x04"
- symaddr = ntmod.getprocaddress(symbol)
- if symaddr == None:
- raise StopIteration
- data = addr_space.zread(symaddr, 100)
- offset = data.find(hexbytes)
- if offset == -1:
- raise StopIteration
- bugs = obj.Object('Pointer', offset = symaddr + offset + len(hexbytes), vm = addr_space)
- bugs = bugs.dereference_as('_KBUGCHECK_REASON_CALLBACK_RECORD')
- for l in bugs.Entry.list_of_type("_KBUGCHECK_REASON_CALLBACK_RECORD", "Entry"):
- component = read_asciiz(addr_space, l.Component)
- yield symbol, l.CallbackRoutine, component
- def get_registry_callbacks_legacy(self, addr_space, ntmod):
- """
- Enumerate registry change callbacks.
- This method of finding a global variable via disassembly of the
- CmRegisterCallback function is only for XP systems. If it fails on
- XP you can still find the callbacks using PoolScanGenericCallback.
- On Vista and Windows 7, these callbacks are registered using the
- CmRegisterCallbackEx function.
- """
- symbol = "CmRegisterCallback"
- symaddr = ntmod.getprocaddress(symbol)
- if symaddr == None:
- raise StopIteration
- data = addr_space.zread(symaddr, 200)
- c = 0
- vector = None
- # Looking for MOV EBX, CmpCallBackVector
- # This may be the first or second MOV EBX instruction
- for op in distorm3.Decompose(symaddr, data, distorm3.Decode32Bits):
- if op.valid and op.mnemonic == "MOV" and len(op.operands) == 2 and op.operands[0].name == 'EBX':
- vector = op.operands[1].value
- if c == 1:
- break
- else:
- c += 1
- # Can't find the global variable
- if vector == None:
- raise StopIteration
- # We could do an array of Pointers here and then dereference_as, but
- # the value of 7 must be subtracted from the object offset before use
- callbacks = obj.Object("Array", targetType = "unsigned int", count = 100, vm = addr_space, offset = vector)
- for cb in callbacks:
- if cb != 0:
- cbblock = obj.Object("_EX_CALLBACK_ROUTINE_BLOCK", vm = addr_space, offset = cb - 7)
- yield symbol, cbblock.Function, '--'
- def get_registry_callbacks(self):
- """
- Enumerate registry callbacks on Vista and 7.
- These callbacks are installed via CmRegisterCallback or CmRegisterCallbackEx.
- """
- for offset in PoolScanRegistryCallback().scan(self.pspace):
- callback = obj.Object('_REGISTRY_CALLBACK', offset, self.pspace)
- yield "CmRegisterCallback", callback.Function, None
- def get_shutdown_callbacks(self, addr_space):
- """Enumerate shutdown notification callbacks"""
- valid = addr_space.is_valid_address
- for offset in PoolScanShutdownCallback().scan(self.pspace):
- callback = obj.Object('_SHUTDOWN_PACKET', offset, self.pspace)
- if not valid(callback.Entry.Flink) or \
- not valid(callback.Entry.Blink) or \
- not valid(callback.DeviceObject): continue
- device = obj.Object('_DEVICE_OBJECT', callback.DeviceObject, addr_space)
- if not valid(device.DriverObject): continue
- address = device.DriverObject.MajorFunction[irpcodes.index('IRP_MJ_SHUTDOWN')]
- details = str(device.DriverObject.DriverName)
- yield "IoRegisterShutdownNotification", address, details
- def get_dbgprint_callbacks(self):
- """Enumerate DebugPrint callbacks on Vista and 7"""
- for offset in PoolScanDbgPrintCallback().scan(self.pspace):
- callback = obj.Object('_DBGPRINT_CALLBACK', offset, self.pspace)
- yield "DbgSetDebugPrintCallback", callback.Function, None
- def calculate(self):
- addr_space = get_malware_space(self._config)
- # Get one physical space that all scanners can use
- self.pspace = utils.load_as(self._config, astype = 'physical')
- # Get the loaded modules and sort them by base address
- drivers = list(modules.lsmod(addr_space))
- mods, mod_addrs = get_sorted_mods(drivers)
- # Find the NT executive module
- ntmod = find_module_by_name(drivers, ["ntoskrnl.exe", "ntkrnlpa.exe"])
- if ntmod == None:
- debug.error('Cannot find executive module!')
- callbacks = []
- callbacks.extend(list(self.get_kernel_callbacks(addr_space, ntmod)))
- callbacks.extend(list(self.get_fs_callbacks()))
- callbacks.extend(list(self.get_bugcheck_callbacks(addr_space)))
- callbacks.extend(list(self.get_bugcheck_reason_callbacks(addr_space, ntmod)))
- callbacks.extend(list(self.get_shutdown_callbacks(addr_space)))
- callbacks.extend(list(self.get_registry_callbacks_legacy(addr_space, ntmod)))
- callbacks.extend(list(self.get_registry_callbacks()))
- callbacks.extend(list(self.get_generic_callbacks(addr_space)))
- callbacks.extend(list(self.get_dbgprint_callbacks()))
- for symbol, callback, component in callbacks:
- # Lookup the name of the owning module using the loaded module list or driver objects
- mod = find_module(mods, mod_addrs, callback)
- try:
- name = str(mod.BaseDllName)
- except:
- name = self.get_name_from_driver_object(callback)
- # Append the optional component name which further describes the owner
- if component != None:
- name += ' (' + component + ')'
- yield symbol, callback, name
- def render_text(self, outfd, data):
- outfd.write("{0:<36} {1:<10} {2}\n".format('Type', 'Callback', 'Owner'))
- for symbol, callback, owner in data:
- outfd.write("{0:<36} {1:<#8x} {2}\n".format(symbol, callback, owner))
- #--------------------------------------------------------------------------------
- # driverirp plug-in, based on AS's DriverScan
- #--------------------------------------------------------------------------------
- irpcodes = ['IRP_MJ_CREATE',
- 'IRP_MJ_CREATE_NAMED_PIPE',
- 'IRP_MJ_CLOSE',
- 'IRP_MJ_READ',
- 'IRP_MJ_WRITE',
- 'IRP_MJ_QUERY_INFORMATION',
- 'IRP_MJ_SET_INFORMATION',
- 'IRP_MJ_QUERY_EA',
- 'IRP_MJ_SET_EA',
- 'IRP_MJ_FLUSH_BUFFERS',
- 'IRP_MJ_QUERY_VOLUME_INFORMATION',
- 'IRP_MJ_SET_VOLUME_INFORMATION',
- 'IRP_MJ_DIRECTORY_CONTROL',
- 'IRP_MJ_FILE_SYSTEM_CONTROL',
- 'IRP_MJ_DEVICE_CONTROL',
- 'IRP_MJ_INTERNAL_DEVICE_CONTROL',
- 'IRP_MJ_SHUTDOWN',
- 'IRP_MJ_LOCK_CONTROL',
- 'IRP_MJ_CLEANUP',
- 'IRP_MJ_CREATE_MAILSLOT',
- 'IRP_MJ_QUERY_SECURITY',
- 'IRP_MJ_SET_SECURITY',
- 'IRP_MJ_POWER',
- 'IRP_MJ_SYSTEM_CONTROL',
- 'IRP_MJ_DEVICE_CHANGE',
- 'IRP_MJ_QUERY_QUOTA',
- 'IRP_MJ_SET_QUOTA',
- 'IRP_MJ_PNP']
- class DriverIrp(filescan.DriverScan, ApiHooks):
- "[MALWARE] Driver IRP hook detection"
- def __init__(self, config, args=None):
- filescan.DriverScan.__init__(self, config, args)
- config.add_option("REGEX", short_option='r', type='str', action='store', help='Analyze drivers matching REGEX')
- def get_irps(self, driver_obj, kas_zread, mods, mod_addrs):
- mem_start = driver_obj.DriverStart
- mem_end = driver_obj.DriverStart + driver_obj.DriverSize
- for irpcode in range(0, len(irpcodes)):
- # Get the address of the IRP from the driver object's MajorFunction table
- irpaddr = driver_obj.MajorFunction[irpcode]
- # Try to get the owner of the IRP routine
- mod = find_module(mods, mod_addrs, irpaddr)
- # Does the IRP point inside the driver object or elsewhere?
- points_inside = False
- # Owner of the IRP routine (or None)
- irpowner = None
- # Disassembly of the instructions at the IRP entry point
- disasm = None
- # Destination address of an inline hook (or None)
- hookdest = None
- # Owner of the hook address if applicable
- hooker = None
- if mod != None:
- if mod.DllBase == driver_obj.DriverStart:
- points_inside = True
- irpowner = str(mod.BaseDllName)
- # Get a disassembly of the IRP routine's prologue
- try:
- bytes = kas_zread(irpaddr, 100)
- disasm = get_disasm_text(bytes, irpaddr.v())
- except:
- pass
- # Check for inline hooks if the IRP itself is not hooked
- if points_inside:
- ret = self.check_inline(irpaddr.v(), self.kernel_address_space, mem_start, mem_end)
- if isinstance(ret, tuple):
- hookdest = ret[0]
- # Try to get the owner of the hooked address
- if hookdest != None:
- hookmod = find_module(mods, mod_addrs, hookdest)
- if hookmod != None:
- hooker = str(hookmod.BaseDllName)
- else:
- hooker = 'UNKNOWN'
- yield irpcode, irpaddr, irpowner, disasm, hookdest, hooker
- def render_text(self, outfd, data):
- """Renders the text-based output"""
- outfd.write("{0:<12} {1:<16} {2:<36} {3:<12} {4:<16} {5:<12} {6}\n".format(
- "DriverStart", "Name", "IRP", "IrpAddr", "IrpOwner", "HookAddr", "HookOwner"))
- addr_space = utils.load_as(self._config)
- # Get a sorted list of module addresses
- mods, mod_addrs = get_sorted_mods(modules.lsmod(addr_space))
- # Compile the regular expression for filtering by driver name
- if self._config.regex != None:
- mod_re = re.compile(self._config.regex, re.I)
- else:
- mod_re = None
- # Handle the objects found by the DriverScan plugin
- for object_obj, driver_obj, extension_obj, ObjectNameString in data:
- # Continue if a regex was supplied and it doesn't match the current driver name
- if mod_re != None:
- if not (mod_re.search(ObjectNameString) or
- mod_re.search(ObjectNameString)): continue
- kas_zread = self.kernel_address_space.zread
- for vals in self.get_irps(driver_obj, kas_zread, mods, mod_addrs):
- # Unpack the return values from get_irps
- irpcode, irpaddr, irpowner, disasm, hookdest, hooker = vals
- irpowner = '-' if not irpowner else irpowner
- hookdest = '-' if not hookdest else hex(hookdest)
- hooker = '-' if not hooker else hooker
- outfd.write("{0:<#12x} {1:<16} {2:<36} {3:<#12x} {4:<16} {5:<12} {6}\n".format(
- driver_obj.DriverStart, ObjectNameString,
- irpcodes[irpcode], irpaddr, irpowner, hookdest, hooker))
- if self._config.VERBOSE:
- outfd.write("{0}\n".format(disasm))
- # Check DriverStartIO per Frank B. and TDL4
- DriverStartIo = driver_obj.DriverStartIo
- if DriverStartIo != 0:
- if DriverStartIo < driver_obj.DriverStart or DriverStartIo > (driver_obj.DriverStart + driver_obj.DriverSize):
- mod = find_module(mods, mod_addrs, DriverStartIo)
- if mod != None:
- owner = str(mod.BaseDllName)
- else:
- owner = "UNKNOWN"
- else:
- owner = ObjectNameString
- outfd.write("{0:<#12x} {1:<16} {2:<36} {3:<#12x} {4:<16} {5:<12} {6}\n".format(
- driver_obj.DriverStart, ObjectNameString,
- 'DriverStartIo', DriverStartIo, owner, '', ''))
- if self._config.VERBOSE:
- try:
- bytes = kas_zread(DriverStartIo, 100)
- disasm = get_disasm_text(bytes, DriverStartIo.v())
- except:
- continue
- outfd.write("{0}\n".format(disasm))
- #--------------------------------------------------------------------------------
- # psxview plug-in (used to be csrpslist)
- #--------------------------------------------------------------------------------
- class _PSP_CID_TABLE(windows._HANDLE_TABLE):
- """Subclass the Windows handle table object for parsing PspCidTable"""
- def get_item(self, offset):
- return obj.Object("_OBJECT_HEADER", \
- offset - self.obj_vm.profile.get_obj_offset('_OBJECT_HEADER', 'Body'), self.obj_vm)
- class PsXview(commands.command):
- "[MALWARE] Find hidden processes with various process listings"
- def __init__(self, config, *args):
- commands.command.__init__(self, config, *args)
- config.add_option("PHYSICAL-OFFSET", short_option = 'P', default = False,
- help = "Physcal Offset", action = "store_true")
- def get_proc_list(self, csr_obj):
- """Iterator that walks the linked list of processes in csrss.exe"""
- for l in csr_obj.ListLink.list_of_type("_CSR_PROCESS", "ListLink"):
- yield l
- def check_pslist(self, all_tasks):
- """Enumerate processes from PsActiveProcessHead"""
- return dict((p.UniqueProcessId.v(), p) for p in all_tasks)
- def check_psscan(self):
- """Enumerate processes with pool tag scanning"""
- return dict((p.UniqueProcessId.v(), p) for p in filescan.PSScan(self._config).calculate() if p.ExitTime == 0)
- def check_thrdproc(self, addr_space):
- """Enumerate processes indirectly by ETHREAD scanning"""
- threads = dict()
- for ethread in modscan.ThrdScan(self._config).calculate():
- if ethread.ExitTime == 0:
- if hasattr(ethread.Tcb, 'Process'):
- proc = obj.Object("_EPROCESS", offset = ethread.Tcb.Process, vm = addr_space)
- if proc.ExitTime == 0:
- pid = proc.UniqueProcessId.v()
- if pid < 65535:
- threads[pid] = proc
- elif hasattr(ethread, 'ThreadsProcess'):
- proc = obj.Object("_EPROCESS", offset = ethread.ThreadsProcess, vm = addr_space)
- if proc.ExitTime == 0:
- pid = proc.UniqueProcessId.v()
- if pid < 65535:
- threads[pid] = proc
- return threads
- def is_process_object(self, objhdr, obj_vm):
- """Determine if an OBJECT_HEADER is for an EPROCESS, taking into
- account the for changes to the object header for Windows 7"""
- volmagic = obj.Object("VOLATILITY_MAGIC", 0x0, obj_vm)
- try:
- # New object header
- if objhdr.TypeIndex == volmagic.TypeIndexMap.v()['Process']:
- return True
- except AttributeError:
- # Old object header
- if (objhdr.Type.Name):
- if str(objhdr.Type.Name) == 'Process':
- return True
- return False
- def check_csrss_handles(self, csrs):
- """Enumerate processes using the csrss.exe handle table"""
- handles = dict()
- for p in csrs:
- # Gather the handles to process objects
- for h in p.ObjectTable.handles():
- if self.is_process_object(h, p.obj_vm):
- proc_obj = obj.Object('_EPROCESS', h.Body.obj_offset, p.obj_vm, parent = p)
- handles[proc_obj.UniqueProcessId.v()] = proc_obj
- return handles
- def check_csrss_links(self, csrs):
- """Enumerate processes using the CsrRootProcess linked list"""
- links = dict()
- for p in csrs:
- # Get the address space
- ps_ad = p.get_process_address_space()
- if ps_ad == None:
- continue
- # Find the DLL which contains the special linked list info
- mods = list(p.list_modules())
- mod = find_module_by_name(mods, ["csrsrv.dll"])
- if mod == None:
- continue
- # Find the exported function
- CsrLockProcessByClientId = mod.getprocaddress("CsrLockProcessByClientId")
- if CsrLockProcessByClientId == None:
- continue
- # Get the global variable referenced by the exported function
- prologue = ps_ad.zread(CsrLockProcessByClientId, 100)
- offset = prologue.find("\x8b\x35")
- if offset == -1:
- # pattern for windows 7
- offset = prologue.find("\x8b\x1d")
- if offset == -1:
- continue
- csrpp = obj.Object('_CSR_PROCESS_POINTER', offset = CsrLockProcessByClientId + offset + 2, vm = ps_ad)
- # Locate the linked list by following pointers in the global variable
- csr_obj = obj.Object("_CSR_PROCESS", offset = csrpp.CsrRootProcess.dereference().dereference().v(), vm = ps_ad)
- # Save the pids
- for csr in self.get_proc_list(csr_obj):
- links[csr.ClientId.UniqueProcess.v()] = None
- return links
- def check_pspcid(self, addr_space):
- """Enumerate processes by walking the PspCidTable"""
- pspcid = dict()
- # Copy the _HANDLE_TABLE type into _PSP_CID_TABLE
- addr_space.profile.add_types(
- {"_PSP_CID_TABLE" : addr_space.profile.abstract_types["_HANDLE_TABLE"]},
- )
- # Tell Volatility that KDBG.PspCidTable is a **_PSP_CID_TABLE
- addr_space.profile.add_types(
- {"_KDDEBUGGER_DATA64" : [None,
- {'PspCidTable': [addr_space.profile.get_obj_offset("_KDDEBUGGER_DATA64", "PspCidTable"),
- ["pointer", ["pointer", ['_PSP_CID_TABLE']]]],
- }] },
- )
- # Follow the pointers to the table base
- PspCidTable = tasks.get_kdbg(addr_space).PspCidTable.dereference().dereference()
- # Walk the handle table
- for h in PspCidTable.handles():
- if self.is_process_object(h, addr_space):
- proc_obj = obj.Object('_EPROCESS', h.Body.obj_offset, addr_space)
- pspcid[proc_obj.UniqueProcessId.v()] = proc_obj
- return pspcid
- def calculate(self):
- addr_space = get_malware_space(self._config)
- all_tasks = list(tasks.pslist(addr_space))
- # The CSR process is necessary for two of the process listings
- csrexe = [p for p in all_tasks if str(p.ImageFileName).lower() == "csrss.exe"]
- # For the following dictionaries, the keys are pids and the values are eproc objects
- ps_sources = dict(
- pslist = self.check_pslist(all_tasks),
- psscan = self.check_psscan(),
- thrdproc = self.check_thrdproc(addr_space),
- pspcid = self.check_pspcid(addr_space),
- csr_handles = self.check_csrss_handles(csrexe) if csrexe else dict(),
- csr_links = self.check_csrss_links(csrexe) if csrexe else dict(),
- )
- # Build a list of pids from all sources
- unique_pids = []
- for source in ps_sources.values():
- unique_pids.extend(source.keys())
- # For each unique pid, see if it exists in the lists
- for pid in list(set(unique_pids)):
- eproc = None
- for source in ps_sources.values():
- if pid in source.keys():
- if source[pid] != None:
- eproc = source[pid]
- break
- yield pid, eproc, ps_sources
- def render_text(self, outfd, data):
- outfd.write("{0:<12} {1:<20} {2:<8} {3:<10} {4:<10} {5:<10} {6:<10} {7:<10} {8:<10}\n".format(
- 'Offset', 'Name', 'Pid', 'pslist', 'psscan', 'thrdproc', 'pspcid', 'csr_hnds', 'csr_list'))
- for pid, eproc, ps_sources in data:
- # csrss links dont have an eprocess
- if eproc == None:
- offset = "----"
- else:
- # force physical offsets for any EPROCESS not already
- # instantiated from a file address space
- if self._config.PHYSICAL_OFFSET and not eproc.obj_vm.__class__.__name__.endswith("FileAddressSpace"):
- offset = hex(eproc.obj_vm.vtop(eproc.obj_offset))
- else:
- offset = hex(eproc.obj_offset)
- outfd.write("{0:<12} {1:<20} {2:<8} {3:<10} {4:<10} {5:<10} {6:<10} {7:<10} {8:<10}\n".format(
- offset,
- eproc.ImageFileName if eproc != None else "----",
- pid,
- ps_sources['pslist'].has_key(pid),
- ps_sources['psscan'].has_key(pid),
- ps_sources['thrdproc'].has_key(pid),
- ps_sources['pspcid'].has_key(pid),
- ps_sources['csr_handles'].has_key(pid),
- ps_sources['csr_links'].has_key(pid),
- ))
- #--------------------------------------------------------------------------------
- # ssdt_ex plugin
- #--------------------------------------------------------------------------------
- class SSDT_EX(ImpScan):
- "[MALWARE] SSDT Hook Explorer for IDA Pro (and SSDT by thread)"
- def __init__(self, config, *args):
- ImpScan.__init__(self, config, *args)
- config.remove_option("PID")
- config.remove_option("ADDR")
- config.remove_option("SIZE")
- config.remove_option("SCAN")
- config.remove_option("UNSAFE")
- config.add_option('DUMP-DIR', short_option = 'D', default = None,
- help = 'Directory in which to dump the files')
- # Names of the legit executive modules for SSDT tables
- self.executive_modules = [
- ["ntoskrnl.exe", "ntkrnlpa.exe", "ntkrnlmp.exe", "ntkrpamp.exe"], # SSDT 0
- ["win32k.sys"], # SSDT 1
- ["spud.sys"], # SSDT 2
- []] # SSDT 3
- def get_ssdt(self, addr_space, procs, mods, mod_addrs):
- # Gather up all SSDTs referenced by threads
- ssdts = set()
- for proc in procs:
- for thread in proc.ThreadListHead.list_of_type("_ETHREAD", "ThreadListEntry"):
- ssdt_obj = thread.Tcb.ServiceTable.dereference_as('_SERVICE_DESCRIPTOR_TABLE')
- ssdts.add(ssdt_obj)
- # Get a list of *unique* SSDT entries. Typically we see only two.
- tables = set()
- for ssdt_obj in ssdts:
- for i, desc in enumerate(ssdt_obj.Descriptors):
- if desc.is_valid() and desc.ServiceLimit > 0 and desc.ServiceLimit < 0xFFFF and desc.KiServiceTable > 0x80000000:
- tables.add((i, desc.KiServiceTable.v(), desc.ServiceLimit.v()))
- tables_with_vm = []
- for idx, table, n in tables:
- found = False
- for p in procs:
- ## This is the process address space
- ps_ad = p.get_process_address_space()
- ## Is the table accessible from the process AS?
- if ps_ad.is_valid_address(table):
- tables_with_vm.append((idx, table, n, ps_ad))
- found = True
- break
- ## If not we use the kernel address space
- if not found:
- # Any VM is equally bad...
- tables_with_vm.append((idx, table, n, addr_space))
- syscalls = addr_space.profile.syscalls
- for idx, table, n, vm in sorted(tables_with_vm, key = itemgetter(0)):
- if not vm.is_valid_address(table):
- continue
- for i in range(n):
- func_addr = obj.Object('unsigned long', table + (i * 4), vm).v()
- try:
- func_name = syscalls[idx][i]
- except IndexError:
- func_name = "Unknown"
- mod = find_module(mods, mod_addrs, func_addr)
- try:
- mod_base = mod.DllBase
- mod_name = mod.BaseDllName
- except:
- mod_base = None
- mod_name = ""
- yield table, idx, i, func_addr, func_name, mod_base, mod_name
- def calculate(self):
- config = self._config
- # If we're dumping drivers, we need an output path
- if config.DUMP_DIR == None:
- debug.error("Please specify a dump directory (--dump-dir)")
- if not os.path.isdir(config.DUMP_DIR):
- debug.error(config.DUMP_DIR + " is not a directory")
- addr_space = get_malware_space(config)
- # Apply the right SSDT structs depending on OS version
- addr_space.profile.add_types(ssdt.sde_types)
- if addr_space.profile.metadata.get('major', 0) == 5 and addr_space.profile.metadata.get('minor',0) == 2:
- addr_space.profile.add_types(ssdt.sdt_types_2k3)
- else:
- addr_space.profile.add_types(ssdt.sdt_types)
- # Get a sorted list of module addresses
- drivers = list(modules.lsmod(addr_space))
- mods, mod_addrs = get_sorted_mods(drivers)
- procs = list(tasks.pslist(addr_space))
- dump_info = []
- for info in self.get_ssdt(addr_space, procs, mods, mod_addrs):
- (table, idx, i, func_addr, func_name, mod_base, mod_name) = info
- # If the module containing the syscall is not the nt executive, then its hooked
- if mod_name not in self.executive_modules[idx]:
- dump_info.append((mod_base, func_addr, func_name))
- print " Entry {0:#06x}: {1:#x} ({2}) owned by {3}".format(idx * 0x1000 + i, func_addr, func_name, mod_name)
- exports = self.get_kernel_exports(addr_space, procs, drivers)
- # The hooks may be spread across multiple drivers, so handle each one separately
- bases = list(set([mod_base for (mod_base, _, _) in dump_info]))
- for base in bases:
- # no PE for the rootkit driver
- if base == None: continue
- # Dump and rebuild the rootkit driver
- pedata = self.rebuild(addr_space, base)
- pedata = self.rebase(addr_space, base, pedata)
- fname = os.path.join(config.DUMP_DIR, "driver.{0:x}.sys".format(base))
- f = open(fname, "wb")
- f.write(pedata)
- f.close()
- # Create a dictionary of the hooked SSDT functions
- hooks = dict((func_addr, func_name) for (mod_base, func_addr, func_name) in dump_info if mod_base == base)
- imports = self.get_all_imports(addr_space, pedata, base, exports)
- # Build the data for IDC statements
- funcs = self.find_functions(pedata, base)
- # Build a dictionary of imports for IDC statements
- names = dict((const, vals[1]) for (const, vals) in imports.items())
- self.automate_ida(fname, names, funcs, hooks)
- def render_text(self, outfd, data):
- pass
- #--------------------------------------------------------------------------------
- # threads plugin - replaces orphanthreads and ssdt_by_threads
- #--------------------------------------------------------------------------------
- KTHREAD_STATE = [
- 'Initialized',
- 'Ready',
- 'Running',
- 'Standby',
- 'Terminated',
- 'Waiting',
- 'Transition',
- 'DeferredReady',
- 'GateWait',
- ]
- KWAIT_REASONS = [
- 'Executive',
- 'FreePage',
- 'PageIn',
- 'PoolAllocation',
- 'DelayExecution',
- 'Suspended',
- 'UserRequest',
- 'WrExecutive',
- 'WrFreePage',
- 'WrPageIn',
- 'WrPoolAllocation',
- 'WrDelayExecution',
- 'WrSuspended',
- 'WrUserRequest',
- 'WrEventPair',
- 'WrQueue',
- 'WrLpcReceive',
- 'WrLpcReply',
- 'WrVirtualMemory',
- 'WrPageOut',
- 'WrRendezvous',
- 'Spare2',
- 'Spare3',
- 'Spare4',
- 'Spare5',
- 'Spare6',
- 'WrKernel',
- 'WrResource',
- 'WrPushLock',
- 'WrMutex',
- 'WrQuantumEnd',
- 'WrDispatchInt',
- 'WrPreempted',
- 'WrYieldExecution',
- 'WrFastMutex',
- 'WrGuardedMutex',
- 'WrRundown',
- 'MaximumWaitReason'
- ]
- PS_CROSS_FLAGS = dict(
- PS_CROSS_THREAD_FLAGS_TERMINATED = 0x00000001,
- PS_CROSS_THREAD_FLAGS_DEADTHREAD = 0x00000002,
- PS_CROSS_THREAD_FLAGS_HIDEFROMDBG = 0x00000004,
- PS_CROSS_THREAD_FLAGS_IMPERSONATING = 0x00000008,
- PS_CROSS_THREAD_FLAGS_SYSTEM = 0x00000010,
- PS_CROSS_THREAD_FLAGS_HARD_ERRORS_DISABLED = 0x00000020,
- PS_CROSS_THREAD_FLAGS_BREAK_ON_TERMINATION = 0x00000040,
- PS_CROSS_THREAD_FLAGS_SKIP_CREATION_MSG = 0x00000080,
- PS_CROSS_THREAD_FLAGS_SKIP_TERMINATION_MSG = 0x00000100,
- )
- IS_SET = lambda x, y : x.CrossThreadFlags & PS_CROSS_FLAGS[y]
- class Threads(SSDT_EX):
- "[MALWARE] Investigate _ETHREAD and _KTHREADs"
- def __init__(self, config, *args):
- SSDT_EX.__init__(self, config, *args)
- config.remove_option("DUMP-DIR")
- config.add_option("FILTER", short_option='F', default=None,
- help='Tags to filter (comma-separated)')
- config.add_option("ALLOW-HOOK", short_option='A', default=None,
- help='Allow SSDT hooks from these mods (comma-separated)')
- config.add_option('PID', short_option = 'p', default = None,
- help = 'Operate on these Process IDs (comma-separated)',
- action = 'store', type = 'str')
- config.add_option("LISTTAGS", short_option='L', default=False,
- action='store_true', help='List all available tags')
- self.kernel = 0x80000000
- self.kernel_space = None
- self.kernel_mods = None
- self.kernel_mod_addrs = None
- self.hooked_tables = dict()
- def check_hooked_ssdt(self, thread, hooked_tables):
- """Check if a thread is using a hooked SSDT"""
- ssdt_obj = thread.Tcb.ServiceTable.dereference_as('_SERVICE_DESCRIPTOR_TABLE')
- for i, desc in enumerate(ssdt_obj.Descriptors):
- tbl = desc.KiServiceTable.v()
- if tbl in hooked_tables.keys():
- return True
- return False
- def check_orphan_thread(self, thread, mods, mod_addrs):
- """Check if a system thread has been abandoned"""
- if IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_SYSTEM'):
- mod = find_module(mods, mod_addrs, thread.StartAddress)
- if not mod:
- return True
- return False
- def check_hw_breakpoints(self, thread):
- """Check active threads for Dr* registers"""
- if not IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_TERMINATED'):
- trapframe = thread.Tcb.TrapFrame
- if thread.obj_vm.is_valid_address(trapframe):
- if (trapframe.Dr0 != 0 or trapframe.Dr1 != 0 or trapframe.Dr2 != 0 or trapframe.Dr3 != 0) and \
- (trapframe.Dr6 != 0 and trapframe.Dr7 != 0):
- return True
- return False
- def get_owner(self, address):
- """Return the owning module for an address"""
- if address > self.kernel:
- return find_module(self.kernel_mods, self.kernel_mod_addrs, address)
- def get_image_name(self, proc_offset):
- """Safely read a process's name, assuming it could be invalid"""
- data = self.kernel_space.zread(proc_offset + self.kernel_space.profile.get_obj_offset("_EPROCESS", "ImageFileName"), 16)
- if data:
- if data.find("\x00") != -1:
- data = data[:data.find("\x00")]
- return repr(data)
- return ""
- def calculate(self):
- checks = dict(
- OrphanThread = 'Detect orphan threads',
- SystemThread = 'Detect system threads',
- HookedSSDT = 'Detect threads using a hooked SSDT',
- ScannerOnly = 'Detect threads no longer in a linked list',
- DkomExit = 'Detect inconsistencies wrt exit times and termination',
- HideFromDebug = 'Detect threads hidden from debuggers',
- HwBreakpoints = 'Detect threads with hardware breakpoints',
- AttachedProcess = 'Detect threads attached to another process',
- )
- if self._config.LISTTAGS:
- print "{0:<20} {1}".format("Tag", "Description")
- print "{0:<20} {1}".format("-" * 14, "-" * 14)
- for k, v in checks.items():
- print "{0:<20} {1}".format(k, v)
- return
- addr_space = utils.load_as(self._config)
- # Apply the right SSDT structs depending on OS version
- addr_space.profile.add_types(ssdt.sde_types)
- if addr_space.profile.metadata.get('major', 0) == 5 and addr_space.profile.metadata.get('minor',0) == 2:
- addr_space.profile.add_types(ssdt.sdt_types_2k3)
- else:
- addr_space.profile.add_types(ssdt.sdt_types)
- # save the space so it can be used in render_text
- self.kernel_space = addr_space
- # ignore ssdt hooks if they point to user-specified "allowed" modules
- if self._config.ALLOW_HOOK:
- exlist = self._config.ALLOW_HOOK.split(",")
- self.executive_modules[0].extend(exlist)
- self.executive_modules[1].extend(exlist)
- # only show threads owned by particular processes
- if self._config.PID:
- pidlist = [int(p) for p in self._config.PID.split(',')]
- else:
- pidlist = []
- # get sorted list of kernel modules
- (kernel_mods, \
- kernel_mod_addrs) = get_sorted_mods(modules.lsmod(addr_space))
- # gather processes
- procs = list(tasks.pslist(addr_space))
- seen = dict()
- # gather up the SSDTs with hooks in them
- hooked_tables = dict()
- for info in self.get_ssdt(addr_space, procs, kernel_mods, kernel_mod_addrs):
- (table, idx, i, func_addr, func_name, mod_base, mod_name) = info
- if mod_name not in self.executive_modules[idx]:
- if hooked_tables.has_key(table):
- hooked_tables[table].append((i, func_name, func_addr, mod_name))
- else:
- hooked_tables[table] = [(i, func_name, func_addr, mod_name)]
- # save the lists so they can be used in render_text
- self.kernel_mods = kernel_mods
- self.kernel_mod_addrs = kernel_mod_addrs
- self.hooked_tables = hooked_tables
- # walk the thread lists
- for proc in self.filter_tasks(procs):
- for thread in proc.ThreadListHead.list_of_type("_ETHREAD", "ThreadListEntry"):
- # the False here means its not found by the scanner
- seen[thread.obj_vm.vtop(thread.obj_offset)] = (False, thread)
- # scan for thread objects, save them if not already seen
- for thread in modscan.ThrdScan(self._config).calculate():
- if not seen.has_key(thread.obj_offset):
- seen[thread.obj_offset] = (True, thread)
- for offset, (found_by_scanner, thread) in seen.items():
- # skip processes the user doesn't want to see
- if pidlist and thread.Cid.UniqueProcess not in pidlist:
- continue
- # get the thread's owner, taking into account the windows 7 changes
- if hasattr(thread.Tcb, 'Process'):
- proc_offset = thread.Tcb.Process
- elif hasattr(thread, 'ThreadsProcess'):
- proc_offset = thread.ThreadsProcess
- # the keys are "tags" which can be filtered on command-line with the --filter
- # parameter. each tag is True or False depending on the specified criteria.
- checks['DkomExit'] = thread.ExitTime != 0 and thread.Tcb.State != 4 and not IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_TERMINATED')
- checks['HideFromDebug'] = IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_HIDEFROMDBG')
- checks['SystemThread'] = IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_SYSTEM')
- checks['Impersonation'] = IS_SET(thread, 'PS_CROSS_THREAD_FLAGS_IMPERSONATING')
- checks['ScannerOnly'] = found_by_scanner
- checks['HookedSSDT'] = self.check_hooked_ssdt(thread, hooked_tables)
- checks['OrphanThread'] = self.check_orphan_thread(thread, kernel_mods, kernel_mod_addrs)
- checks['HwBreakpoints'] = self.check_hw_breakpoints(thread) if not found_by_scanner else False
- checks['AttachedProcess'] = thread.ExitTime == 0 and proc_offset != thread.Tcb.ApcState.Process
- yield thread, proc_offset, checks
- def render_text(self, outfd, data):
- # determine which filters the user wants to see
- if self._config.FILTER:
- filters = set(self._config.FILTER.split(','))
- else:
- filters = set()
- for thread, proc_offset, checks in data:
- # if the user didn't set filters, display all results. if the user
- # set one or more filters, only show threads with matching results.
- tags = set([t for t, v in checks.items() if v])
- if filters and not filters & tags:
- continue
- s = "------\n"
- s += "ETHREAD: {0:#010x} Pid: {1} Tid: {2}\n".format(thread.obj_offset, \
- thread.Cid.UniqueProcess, thread.Cid.UniqueThread)
- s += "Tags: {0}\n".format(','.join(tags))
- s += "Created: {0}\n".format(thread.CreateTime or '-')
- s += "Exited: {0}\n".format(thread.ExitTime or '-')
- owner_proc = self.get_image_name(proc_offset)
- attach_proc = self.get_image_name(thread.Tcb.ApcState.Process)
- s += "Owning Process: {0:#x} {1}\n".format(proc_offset, \
- owner_proc)
- s += "Attached Process: {0:#x} {1}\n".format(thread.Tcb.ApcState.Process, \
- attach_proc)
- # lookup the thread's state
- try:
- state = KTHREAD_STATE[thread.Tcb.State]
- except IndexError:
- state = hex(thread.Tcb.State)
- # append the wait reason
- if state == 'Waiting':
- state = state + ':' + KWAIT_REASONS[thread.Tcb.WaitReason]
- s += "State: {0}\n".format(state)
- s += "BasePriority: {0:#x}\n".format(thread.Tcb.BasePriority)
- s += "Priority: {0:#x}\n".format(thread.Tcb.Priority)
- s += "TEB: {0:#010x}\n".format(thread.Tcb.Teb)
- # lookup the owning module according to start address
- owner = self.get_owner(thread.StartAddress)
- s += "StartAddress: {0:#010x} {1}\n".format(thread.StartAddress, \
- owner.BaseDllName if owner else '')
- # check the flag which indicates whether Win32StartAddress is valid
- if thread.SameThreadApcFlags & 1:
- s += "Win32StartAddress: {0:#010x}\n".format(thread.Win32StartAddress)
- # pretty print the ssdt's
- s += "ServiceTable: {0:#010x}\n".format(thread.Tcb.ServiceTable)
- ssdt_obj = obj.Object("_SERVICE_DESCRIPTOR_TABLE", \
- offset = thread.Tcb.ServiceTable, vm = self.kernel_space)
- if ssdt_obj != None:
- for i, desc in enumerate(ssdt_obj.Descriptors):
- if desc.is_valid() and desc.ServiceLimit > 0 and \
- desc.ServiceLimit < 0xFFFF and desc.KiServiceTable > self.kernel:
- s += " [{0}] {1:#010x}\n".format(i, desc.KiServiceTable.v())
- else:
- s += " [{0}] -\n".format(i)
- # show exactly which functions are hooked
- tbl = desc.KiServiceTable.v()
- if tbl in self.hooked_tables.keys():
- for (i, func_name, func_addr, mod_name) in self.hooked_tables[tbl]:
- s += " [{0:#x}] {1} {2:#x} {3}\n".format(i, func_name, func_addr, mod_name)
- s += "Win32Thread: {0:#010x}\n".format(thread.Tcb.Win32Thread)
- s += "CrossThreadFlags: {0}\n".format(\
- '|'.join(get_flags(PS_CROSS_FLAGS, thread.CrossThreadFlags)))
- # print the registers if possible
- if self.kernel_space.is_valid_address(thread.Tcb.TrapFrame):
- trapframe = obj.Object("_KTRAP_FRAME", vm = self.kernel_space, \
- offset = thread.Tcb.TrapFrame)
- owner = self.get_owner(trapframe.Eip)
- s += "Eip: {0:#10x} {1}\n".format(trapframe.Eip, owner.BaseDllName if owner else '')
- s += " eax={0:#010x} ebx={1:#010x} ecx={2:#010x} edx={3:#010x} esi={4:#010x} edi={5:#010x}\n".format(
- trapframe.Eax, trapframe.Ebx, trapframe.Ecx, \
- trapframe.Edx, trapframe.Esi, trapframe.Edi)
- s += " eip={0:#010x} esp={1:#010x} ebp={2:#010x} err={3:#010x}\n".format(
- trapframe.Eip, trapframe.HardwareEsp, \
- trapframe.Ebp, trapframe.ErrCode)
- s += " cs={0:#04x} ss={1:#04x} ds={2:#04x} es={3:#04x} gs={4:#04x} efl={5:#010x}\n".format(
- trapframe.SegCs, trapframe.HardwareSegSs, trapframe.SegDs, \
- trapframe.SegEs, trapframe.SegGs, trapframe.EFlags)
- s += " dr0={0:#010x} dr1={1:#010x} dr2={2:#010x} dr3={3:#010x} dr6={4:#010x} dr7={5:#010x}\n".format(
- trapframe.Dr0, trapframe.Dr1, trapframe.Dr2, \
- trapframe.Dr3, trapframe.Dr6, trapframe.Dr7)
- # disasemble the start address if possible
- if self.kernel_space.is_valid_address(thread.StartAddress):
- buf = self.kernel_space.zread(thread.StartAddress, 24)
- dis = get_disasm_text(buf, thread.StartAddress.v())
- if dis:
- s += "{0}\n".format(dis)
- outfd.write("{0}\n".format(s))
- #--------------------------------------------------------------------------------
- # devicetree plugin
- #--------------------------------------------------------------------------------
- device_types = {
- 0x00000027 : 'FILE_DEVICE_8042_PORT',
- 0x00000032 : 'FILE_DEVICE_ACPI',
- 0x00000029 : 'FILE_DEVICE_BATTERY',
- 0x00000001 : 'FILE_DEVICE_BEEP',
- 0x0000002a : 'FILE_DEVICE_BUS_EXTENDER',
- 0x00000002 : 'FILE_DEVICE_CD_ROM',
- 0x00000003 : 'FILE_DEVICE_CD_ROM_FILE_SYSTEM',
- 0x00000030 : 'FILE_DEVICE_CHANGER',
- 0x00000004 : 'FILE_DEVICE_CONTROLLER',
- 0x00000005 : 'FILE_DEVICE_DATALINK',
- 0x00000006 : 'FILE_DEVICE_DFS',
- 0x00000035 : 'FILE_DEVICE_DFS_FILE_SYSTEM',
- 0x00000036 : 'FILE_DEVICE_DFS_VOLUME',
- 0x00000007 : 'FILE_DEVICE_DISK',
- 0x00000008 : 'FILE_DEVICE_DISK_FILE_SYSTEM',
- 0x00000033 : 'FILE_DEVICE_DVD',
- 0x00000009 : 'FILE_DEVICE_FILE_SYSTEM',
- 0x0000003a : 'FILE_DEVICE_FIPS',
- 0x00000034 : 'FILE_DEVICE_FULLSCREEN_VIDEO',
- 0x0000000a : 'FILE_DEVICE_INPORT_PORT',
- 0x0000000b : 'FILE_DEVICE_KEYBOARD',
- 0x0000002f : 'FILE_DEVICE_KS',
- 0x00000039 : 'FILE_DEVICE_KSEC',
- 0x0000000c : 'FILE_DEVICE_MAILSLOT',
- 0x0000002d : 'FILE_DEVICE_MASS_STORAGE',
- 0x0000000d : 'FILE_DEVICE_MIDI_IN',
- 0x0000000e : 'FILE_DEVICE_MIDI_OUT',
- 0x0000002b : 'FILE_DEVICE_MODEM',
- 0x0000000f : 'FILE_DEVICE_MOUSE',
- 0x00000010 : 'FILE_DEVICE_MULTI_UNC_PROVIDER',
- 0x00000011 : 'FILE_DEVICE_NAMED_PIPE',
- 0x00000012 : 'FILE_DEVICE_NETWORK',
- 0x00000013 : 'FILE_DEVICE_NETWORK_BROWSER',
- 0x00000014 : 'FILE_DEVICE_NETWORK_FILE_SYSTEM',
- 0x00000028 : 'FILE_DEVICE_NETWORK_REDIRECTOR',
- 0x00000015 : 'FILE_DEVICE_NULL',
- 0x00000016 : 'FILE_DEVICE_PARALLEL_PORT',
- 0x00000017 : 'FILE_DEVICE_PHYSICAL_NETCARD',
- 0x00000018 : 'FILE_DEVICE_PRINTER',
- 0x00000019 : 'FILE_DEVICE_SCANNER',
- 0x0000001c : 'FILE_DEVICE_SCREEN',
- 0x00000037 : 'FILE_DEVICE_SERENUM',
- 0x0000001a : 'FILE_DEVICE_SERIAL_MOUSE_PORT',
- 0x0000001b : 'FILE_DEVICE_SERIAL_PORT',
- 0x00000031 : 'FILE_DEVICE_SMARTCARD',
- 0x0000002e : 'FILE_DEVICE_SMB',
- 0x0000001d : 'FILE_DEVICE_SOUND',
- 0x0000001e : 'FILE_DEVICE_STREAMS',
- 0x0000001f : 'FILE_DEVICE_TAPE',
- 0x00000020 : 'FILE_DEVICE_TAPE_FILE_SYSTEM',
- 0x00000038 : 'FILE_DEVICE_TERMSRV',
- 0x00000021 : 'FILE_DEVICE_TRANSPORT',
- 0x00000022 : 'FILE_DEVICE_UNKNOWN',
- 0x0000002c : 'FILE_DEVICE_VDM',
- 0x00000023 : 'FILE_DEVICE_VIDEO',
- 0x00000024 : 'FILE_DEVICE_VIRTUAL_DISK',
- 0x00000025 : 'FILE_DEVICE_WAVE_IN',
- 0x00000026 : 'FILE_DEVICE_WAVE_OUT',
- }
- class DeviceTree(filescan.DriverScan):
- "[MALWARE] Show device tree"
- def render_text(self, outfd, data):
- for object_obj, driver_obj, extension_obj, object_name in data:
- outfd.write("DRV 0x{0:08x} {1}\n".format(driver_obj.obj_offset,
- self.parse_string(driver_obj.DriverName)))
- next_device = obj.Object("_DEVICE_OBJECT",
- driver_obj.DeviceObject, self.kernel_address_space)
- while next_device:
- outfd.write("---| DEV {0:#x} {1} {2}\n".format(
- next_device.obj_offset,
- get_obj_name(self.kernel_address_space, next_device),
- device_types.get(next_device.DeviceType.v(), "FILE_DEVICE_UNKNOWN")))
- att_device = next_device.AttachedDevice.dereference()
- level = 0
- while att_device:
- name = get_obj_name(self.kernel_address_space, att_device)
- name = name + " - " + self.parse_string(att_device.DriverObject.DriverName)
- outfd.write("------{0}| ATT {1:#x} {2} {3}\n".format(
- "---" * level,
- att_device.obj_offset,
- name,
- device_types.get(att_device.DeviceType.v(), "FILE_DEVICE_UNKNOWN")))
- att_device = att_device.AttachedDevice.dereference()
- level += 1
- next_device = next_device.NextDevice.dereference()
- class Timers(commands.command):
- """Print kernel timers and associated module DPCs"""
- def __init__(self, config, *args):
- commands.command.__init__(self, config, *args)
- def find_list_head(self, mods, func, sig):
- """Find the KiTimerTableListHead given an exported
- function as a starting point and a small signature"""
- ntmod = find_module_by_name(mods, ["ntoskrnl.exe", "ntkrnlpa.exe"])
- if ntmod:
- func_addr = ntmod.getprocaddress(func)
- if func_addr:
- data = ntmod.obj_vm.zread(func_addr, 200)
- if data:
- n = data.find(sig)
- if n != -1:
- return obj.Object('Pointer', vm = ntmod.obj_vm, offset = func_addr + n + len(sig))
- def calculate(self):
- addr_space = get_malware_space(self._config)
- # get the OS version we're analyzing
- version = (addr_space.profile.metadata.get('major', 0),
- addr_space.profile.metadata.get('minor', 0))
- # get a list of drivers and sort them for easy lookups
- drivers = list(modules.lsmod(addr_space))
- mods, mod_addrs = get_sorted_mods(drivers)
- # KTIMERs collected
- timers = []
- # valid KTIMER.Header.Type values
- TimerNotificationObject = 8
- TimerSynchronizationObject = 9
- valid_types = (TimerNotificationObject, TimerSynchronizationObject)
- if version == (5,1) or (version == (5,2) and addr_space.profile.__class__.__name__ == "Win2K3SP0x86"):
- """
- On XP SP0-SP3 x86 and Windows 2003 SP0, KiTimerTableListHead
- is an array of 256 _LIST_ENTRY for _KTIMERs.
- """
- KiTimerTableListHead = self.find_list_head(drivers,
- "KeUpdateSystemTime",
- "\x25\xFF\x00\x00\x00\x8D\x0C\xC5")
- lists = obj.Object("Array", offset = KiTimerTableListHead,
- vm = addr_space,
- targetType = '_LIST_ENTRY',
- count = 256)
- for l in lists:
- for t in l.list_of_type("_KTIMER", "TimerListEntry"):
- timers.append(t)
- elif version == (5,2) or version == (6,0):
- """
- On XP x64, Windows 2003 SP1-SP2, and Vista SP0-SP2, KiTimerTableListHead
- is an array of 512 _KTIMER_TABLE_ENTRY structs.
- """
- addr_space.profile.add_types({
- '_KTIMER_TABLE_ENTRY' : [ 0x10, {
- 'Entry' : [ 0x0, ['_LIST_ENTRY']],
- 'Time' : [ 0x8, ['_ULARGE_INTEGER']],
- }]})
- KiTimerTableListHead = self.find_list_head(drivers,
- "KeCancelTimer",
- "\xC1\xE7\x04\x81\xC7")
- lists = obj.Object("Array", offset = KiTimerTableListHead,
- vm = addr_space,
- targetType = '_KTIMER_TABLE_ENTRY',
- count = 512)
- for l in lists:
- for t in l.Entry.list_of_type("_KTIMER", "TimerListEntry"):
- timers.append(t)
- elif version == (6,1):
- """
- On Windows 7, there is no more KiTimerTableListHead. The list is
- at _KPCR.PrcbData.TimerTable.TimerEntries (credits to Matt Suiche
- for this one. See http://pastebin.com/FiRsGW3f).
- """
- volmagic = obj.Object('VOLATILITY_MAGIC', 0x0, addr_space)
- kpcr = obj.Object("_KPCR", offset = volmagic.KPCR.v(), vm = addr_space)
- for table in kpcr.PrcbData.TimerTable.TimerEntries:
- for t in table.Entry.list_of_type("_KTIMER", "TimerListEntry"):
- timers.append(t)
- for timer in timers:
- # sanity check on the timer type
- if timer.Header.Type not in valid_types:
- continue
- # ignore timers without DPCs
- if not timer.Dpc.is_valid() or not timer.Dpc.DeferredRoutine.is_valid():
- continue
- # lookup the module containing the DPC
- mod = find_module(mods, mod_addrs, timer.Dpc.DeferredRoutine)
- yield timer, mod.BaseDllName if mod else "UNKNOWN"
- def render_text(self, outfd, data):
- outfd.write("{0:<12} {1:<20} {2:<10} {3:<10} {4:<12} {5}\n".format("Offset",
- "DueTime", "Period(ms)", "Signaled", "Routine", "Module"))
- for timer, owner in data:
- outfd.write("{0:<#12x} {1:#010x}:{2:#010x} {3:<10} {4:<10} {5:<#12x} {6}\n".format(
- timer.obj_offset,
- timer.DueTime.HighPart,
- timer.DueTime.LowPart,
- timer.Period,
- "Yes" if timer.Header.SignalState.v() else "-",
- timer.Dpc.DeferredRoutine,
- owner))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement