Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- from collections import Counter
- import math
- import string
- import tkinter as tk
- from tkinter import ttk, filedialog, messagebox
- from tkinter.scrolledtext import ScrolledText
- from optparse import OptionParser
- from typing import Dict, List, Optional
- from dataclasses import dataclass
- import re
- import traceback
- import yara
- from fpdf import FPDF
- import logging
- import uuid
- from datetime import datetime
- import zipfile
- import tarfile
- import logging
- from pathlib import Path
- # Initialize the unpacker
- class unpack_lib:
- def __init__(self):
- import magic
- self.supported_formats = ['zip', 'rar', '7z', 'tar', 'gz']
- self.mime = magic.Magic(mime=True)
- self.extraction_stats = {
- 'files_processed': 0,
- 'successful_extractions': 0,
- 'failed_extractions': 0
- }
- def unpack_file(self, file_path, target_dir):
- """Unpack files based on their format"""
- try:
- file_format = self._detect_format(file_path)
- if file_format in self.supported_formats:
- self.extraction_stats['files_processed'] += 1
- success = self._extract_files(file_path, target_dir)
- if success:
- self.extraction_stats['successful_extractions'] += 1
- return True
- self.extraction_stats['failed_extractions'] += 1
- return False
- except Exception as e:
- logging.error(f"Unpacking error: {str(e)}")
- return False
- def _detect_format(self, file_path):
- """Detect compression format of the file"""
- mime_type = self.mime.from_file(file_path)
- format_mapping = {
- 'application/zip': 'zip',
- 'application/x-rar': 'rar',
- 'application/x-7z-compressed': '7z',
- 'application/x-tar': 'tar',
- 'application/gzip': 'gz'
- }
- return format_mapping.get(mime_type)
- def _extract_files(self, file_path, target_dir):
- """Extract files to target directory"""
- Path(target_dir).mkdir(parents=True, exist_ok=True)
- try:
- file_format = self._detect_format(file_path)
- if file_format == 'zip':
- with zipfile.ZipFile(file_path, 'r') as zip_ref:
- zip_ref.extractall(target_dir)
- elif file_format == 'rar':
- with rarfile.RarFile(file_path) as rar_ref:
- rar_ref.extractall(target_dir)
- elif file_format == '7z':
- with py7zr.SevenZipFile(file_path, mode='r') as z7_ref:
- z7_ref.extractall(target_dir)
- elif file_format in ['tar', 'gz']:
- with tarfile.open(file_path) as tar_ref:
- tar_ref.extractall(target_dir)
- return True
- except Exception as e:
- logging.error(f"Extraction error for {file_path}: {str(e)}")
- return False
- def get_extraction_stats(self):
- """Return current extraction statistics"""
- return self.extraction_stats
- @dataclass
- class BlockDevice:
- name: str
- size: int
- sector_size: int
- def __hash__(self):
- return hash((self.name, self.size, self.sector_size))
- def __eq__(self, other):
- return self.name == other.name and self.size == other.size and self.sector_size == other.sector_size
- class BinaryViewerGUI:
- import magic
- def __init__(self, root,device: BlockDevice, sector_num, device_path):
- self.unpacker = unpack_lib()
- self.root = root
- self.root.title("Spear of Telesto - Advanced Malware Analysis")
- self.info_display = ScrolledText(root, wrap=tk.WORD, width=80, height=5)
- self.info_display.pack(expand=False, fill='x')
- self.device_manager = self.DeviceManager(root, device, sector_num,device_path) # Pass required parameters
- self.device = [device]
- sector_data = self.device_manager.read_sector(device_path, sector_num, device)
- self.sector_analysis = self.device_manager.analyze_sector_content(sector_data)
- self.update_device_info(device)
- self.exploit_analyzer = self.ExploitAnalyzer()
- self.memory_inspector = self.MemoryInspector()
- self.network_analyzer = self.NetworkPayloadAnalyzer()
- self.disasm_engine = self.DisassemblyEngine()
- self.parser = self.create_parser()
- # Create main menu
- self.create_menu(self.device)
- # Create notebook for tabs
- self.notebook = ttk.Notebook(root)
- self.notebook.pack(expand=True, fill='both')
- # Create tabs
- self.hex_view = self.create_hex_view()
- self.analysis_view = self.create_analysis_view()
- self.string_view = self.create_string_view()
- self.stats_view = self.create_stats_view()
- self.exploit_view = self.create_exploit_view()
- self.memory_view = self.create_memory_view()
- self.network_view = self.create_network_view()
- self.disasm_view = self.create_disasm_view()
- self.parser_view = self.create_parser_view()
- self.shellcode_view = self.create_shellcode_view()
- def create_menu(self, device):
- menubar = tk.Menu(self.root)
- filename = device if device else None
- # File Menu
- file_menu = tk.Menu(menubar, tearoff=0)
- file_menu.add_command(label="Open", command=lambda: self.open_file(device, device_path=filename))
- file_menu.add_command(label="Save", command=self.save_file)
- file_menu.add_command(label="New", command=self.new_file)
- file_menu.add_separator()
- file_menu.add_command(label="Exit", command=self.root.quit)
- menubar.add_cascade(label="File", menu=file_menu)
- # Edit Menu
- edit_menu = tk.Menu(menubar, tearoff=0)
- edit_menu.add_command(label="Copy", command=self.copy_selection)
- edit_menu.add_command(label="Paste", command=self.paste_selection)
- menubar.add_cascade(label="Edit", menu=edit_menu)
- # Analysis Menu
- analysis_menu = tk.Menu(menubar, tearoff=0)
- analysis_menu.add_command(label="Export Report", command=self.export_analysis_report)
- analysis_menu.add_command(label="Run YARA Scan", command=self.run_yara_scan)
- analysis_menu.add_command(label="Deobfuscate", command=self.deobfuscate_binary)
- menubar.add_cascade(label="Analysis", menu=analysis_menu)
- self.root.config(menu=menubar)
- def export_analysis_report(self):
- from datetime import datetime
- from fpdf import XPos, YPos
- # Create report with helvetica font
- report = FPDF()
- report.add_page()
- report.set_font('helvetica', size=12)
- # Add title and timestamp with updated positioning
- report.cell(0, 10, 'Analysis Report', align='C', new_x=XPos.LMARGIN, new_y=YPos.NEXT)
- report.cell(0, 10, f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
- new_x=XPos.LMARGIN, new_y=YPos.NEXT)
- # Add sections with current syntax
- sections = {
- "Hex Analysis": self.hex_display.get("1.0", tk.END),
- "Memory Analysis": self.memory_text.get("1.0", tk.END),
- "Network Analysis": self.network_display.get("1.0", tk.END),
- "Shellcode Analysis": self.shellcode_display.get("1.0", tk.END)
- }
- for title, content in sections.items():
- report.add_page()
- report.cell(0, 10, title, new_x=XPos.LMARGIN, new_y=YPos.NEXT)
- safe_content = ''.join(char if ord(char) < 128 else '?' for char in str(content))
- report.multi_cell(0, 10, safe_content)
- filename = f"analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
- report.output(filename)
- def run_yara_scan(self):
- """Execute YARA scanning with integrated rules"""
- rules_dir = "rules/"
- all_matches = []
- # Compile built-in rules
- builtin_rules = """
- rule detect_shellcode {
- meta:
- description = "Detect potential shellcode"
- strings:
- $s1 = { 31 c0 50 68 } // XOR EAX, EAX; PUSH EAX; PUSH
- $s2 = { e8 00 00 00 00 } // CALL next instruction
- $s3 = { 90 90 90 90 } // NOP sled
- condition:
- any of them
- }
- rule detect_encoder {
- meta:
- description = "Detect encoding routines"
- strings:
- $xor = { 30 ?? 40 } // XOR-based encoder
- $add = { 80 ?? ?? } // ADD-based encoder
- condition:
- any of them
- }
- """
- try:
- # Compile and run built-in rules
- builtin_yara = yara.compile(source=builtin_rules)
- matches = self.integrate_yara_scanning(builtin_yara)
- all_matches.extend(matches)
- # Load and run external rules
- if os.path.exists(rules_dir):
- rule_files = [f for f in os.listdir(rules_dir) if f.endswith('.yar')]
- for rule_file in rule_files:
- rule_path = os.path.join(rules_dir, rule_file)
- external_rules = yara.compile(filepath=rule_path)
- matches = self.integrate_yara_scanning(external_rules)
- all_matches.extend(matches)
- self.yara_matches = all_matches
- self.display_yara_results(all_matches)
- except yara.Error as ye:
- logging.error(f"YARA compilation error: {ye}")
- except Exception as e:
- logging.error(f"Error during YARA scanning: {e}")
- def display_yara_results(self, matches):
- """Display YARA scan results in analysis view"""
- self.analysis_display.delete('1.0', tk.END)
- self.analysis_display.insert(tk.END, "YARA Scan Results:\n\n")
- match_count = 0
- for match in matches:
- match_count += 1
- self.analysis_display.insert(tk.END, f"Rule: {match['rule']}\n")
- self.analysis_display.insert(tk.END, f"Category: {match['category']}\n")
- self.analysis_display.insert(tk.END, f"Matched Strings:\n")
- for offset, identifier, string in match['strings']:
- self.analysis_display.insert(tk.END, f" {identifier} at offset {offset}: {string}\n")
- self.analysis_display.insert(tk.END, "\n")
- # Add completion notification
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- messagebox.showinfo("Scan Complete",
- f"YARA scan completed at {timestamp}\n"
- f"Found {match_count} matches")
- # Update status in info display
- self.info_display.insert(tk.END,
- f"\nYARA Scan completed at {timestamp} - {match_count} matches found\n")
- def integrate_yara_scanning(self, rules=None):
- """YARA rule integration and scanning"""
- try:
- # Initialize YARA rules from multiple sources
- rules_sources = {
- 'malware': 'rules/malware_rules.yar',
- 'exploits': 'rules/exploit_rules.yar',
- 'packers': 'rules/packer_rules.yar',
- 'crypto': 'rules/crypto_rules.yar'
- }
- all_matches = []
- for category, rulefile in rules_sources.items():
- if os.path.exists(rulefile):
- rules = yara.compile(filepath=rulefile)
- matches = rules.match(data=self.sector_data)
- if matches:
- all_matches.extend([{
- 'category': category,
- 'rule': match.rule,
- 'strings': match.strings,
- 'tags': match.tags
- } for match in matches])
- return all_matches
- except Exception as e:
- self.log_scanning_error("YARA", e)
- return []
- def deobfuscate_binary(self):
- sector_data = self.device_manager.read_sector(
- self.device[0].name,
- sector_num=0,
- device=self.device[0]
- )
- packer_type = BinaryViewerGUI.detect_packer(sector_data)
- if packer_type:
- unpacked_data = self.unpack_binary(sector_data, packer_type)
- self.analysis_display(unpacked_data)
- return
- def remove_obfuscation(self, data: bytes) -> bytes:
- """Multi-layer deobfuscation engine"""
- deobfuscated = data
- # Apply multiple deobfuscation techniques
- deobfuscated = self._remove_xor_encoding(deobfuscated)
- deobfuscated = self._remove_rot_encoding(deobfuscated)
- deobfuscated = self._remove_base64_encoding(deobfuscated)
- deobfuscated = self._decode_custom_alphabet(deobfuscated)
- return deobfuscated
- def _remove_xor_encoding(self, data: bytes) -> bytes:
- """Remove XOR-based obfuscation"""
- potential_keys = [0xFF, 0x90, 0x50] # Common XOR keys
- best_result = data
- best_entropy = self._check_entropy(data)
- for key in potential_keys:
- decoded = bytes(b ^ key for b in data)
- entropy = self._check_entropy(decoded)
- if entropy < best_entropy:
- best_result = decoded
- best_entropy = entropy
- return best_result
- def _remove_rot_encoding(self, data: bytes) -> bytes:
- """Remove rotation-based encoding"""
- rotations = [13, 47] # Common ROT values
- best_result = data
- best_printable = sum(chr(b).isprintable() for b in data)
- for rot in rotations:
- decoded = bytes((b + rot) % 256 for b in data)
- printable_chars = sum(chr(b).isprintable() for b in decoded)
- if printable_chars > best_printable:
- best_result = decoded
- best_printable = printable_chars
- return best_result
- def _remove_base64_encoding(self, data: bytes) -> bytes:
- """Remove Base64 encoding if detected"""
- try:
- import base64
- if all(chr(b) in string.printable for b in data):
- return base64.b64decode(data)
- except:
- pass
- return data
- def _decode_custom_alphabet(self, data: bytes) -> bytes:
- """Handle custom alphabet encodings"""
- custom_alphabets = {
- 'hex': bytes.fromhex,
- 'oct': lambda x: bytes(int(x[i:i+3], 8) for i in range(0, len(x), 3)),
- }
- for decoder in custom_alphabets.values():
- try:
- decoded = decoder(data)
- if self._check_entropy(decoded) < self._check_entropy(data):
- return decoded
- except:
- continue
- return data
- def detect_custom_obfuscation(self, data: bytes) -> bool:
- """Detect various obfuscation techniques"""
- indicators = {
- 'high_entropy': self._check_entropy(data) > 7.0,
- 'suspicious_ops': self._check_suspicious_operations(data),
- 'encrypted_signs': self._check_encryption_indicators(data),
- 'encoded_content': self._check_encoding_patterns(data)
- }
- return any(indicators.values())
- def _check_entropy(self, data: bytes) -> float:
- """Calculate Shannon entropy"""
- freq = Counter(data)
- return -sum(count/len(data) * math.log2(count/len(data)) for count in freq.values())
- def _check_suspicious_operations(self, data: bytes) -> bool:
- suspicious_patterns = [
- b'\x48\x31\xc0', # XOR RAX, RAX
- b'\x48\x31\xff', # XOR RDI, RDI
- b'\x48\x31\xd2', # XOR RDX, RDX
- ]
- return any(pattern in data for pattern in suspicious_patterns)
- def _check_encryption_indicators(self, data: bytes) -> bool:
- crypto_constants = [
- bytes.fromhex('67452301'), # MD5
- bytes.fromhex('0123456789ABCDEF'), # Common encryption key pattern
- ]
- return any(const in data for const in crypto_constants)
- def _check_encoding_patterns(self, data: bytes) -> bool:
- encoding_patterns = {
- 'base64': rb'[A-Za-z0-9+/=]{16,}',
- 'hex': rb'[A-Fa-f0-9]{16,}',
- }
- return any(re.search(pattern, data) for pattern in encoding_patterns.values())
- def _unpack_upx(self, data: bytes) -> bytes:
- """UPX unpacking implementation"""
- try:
- import lzma
- # Find UPX compressed section
- upx_start = data.find(b'UPX1')
- if upx_start != -1:
- compressed_data = data[upx_start:]
- return lzma.decompress(compressed_data)
- return data
- except Exception as e:
- self.log_unpacking_error("UPX", e)
- return data
- def _unpack_aspack(self, data: bytes) -> bytes:
- """ASPack unpacking implementation"""
- try:
- # ASPack specific unpacking logic
- aspack_start = data.find(b'ASPack')
- if aspack_start != -1:
- # Extract compressed section
- compressed = data[aspack_start + 512:] # Skip header
- # Custom ASPack decompression
- return self._aspack_decompress(compressed)
- return data
- except Exception as e:
- self.log_unpacking_error("ASPack", e)
- return data
- def log_unpacking_error(self, error_type, file_path, error_msg):
- """Log detailed unpacking errors with context information Args:error_type (str): Type of unpacking error file_path (str): Path to file that failed unpacking error_msg (str): Detailed error message"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- error_id = uuid.uuid4().hex[:8]
- error_details = {
- "timestamp": timestamp,
- "error_id": error_id,
- "error_type": error_type,
- "file_path": file_path,
- "error_message": str(error_msg),
- "stack_trace": traceback.format_exc()
- }
- # Log to file
- logging.error(f"Unpacking Error [{error_id}]: {error_type} - {file_path}")
- logging.error(f"Details: {error_msg}")
- # Store in database/error log
- self.error_log.append(error_details)
- # Notify if critical
- if error_type in self.CRITICAL_ERRORS:
- self.notify_admin(error_details)
- def error_log(self, error_type: str, data_sample: str, error_msg: str):
- """Log detailed unpacking errors with context"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- error_id = uuid.uuid4().hex[:8]
- error_details = {
- "id": error_id,
- "timestamp": timestamp,
- "type": error_type,
- "sample": data_sample,
- "message": error_msg,
- "stack": traceback.format_exc()
- }
- logging.error(f"[{error_id}] {error_type} Error: {error_msg}")
- logging.debug(f"Sample data: {data_sample}")
- return error_details
- def _unpack_pecompact(self, data: bytes) -> bytes:
- """PECompact unpacking implementation"""
- try:
- # PECompact specific unpacking
- if b'PEC2' in data:
- # Extract and decompress PECompact section
- return self._pecompact_decompress(data)
- return data
- except Exception as e:
- self.log_unpacking_error("PECompact", e)
- return data
- def _pecompact_decompress(self, data: bytes) -> bytes:
- """PECompact specific decompression implementation"""
- try:
- # Find PECompact signature
- sig_offset = data.find(b'PEC2')
- if sig_offset == -1:
- return data
- # Extract compressed section
- compressed = data[sig_offset + 512:] # Skip header
- # Custom PECompact decompression algorithm
- result = bytearray()
- i = 0
- while i < len(compressed):
- flag = compressed[i]
- i += 1
- if flag & 0x80: # Compressed block
- length = ((flag & 0x7f) + 2)
- offset = int.from_bytes(compressed[i:i+2], 'little')
- i += 2
- for j in range(length):
- result.append(result[-offset])
- else: # Raw data
- length = flag + 1
- result.extend(compressed[i:i+length])
- i += length
- return bytes(result)
- except Exception as e:
- self.log_unpacking_error("PECompact", data[:32].hex(), str(e))
- return data
- def _aspack_decompress(self, data: bytes) -> bytes:
- """ASPack decompression implementation"""
- try:
- # Find ASPack signature
- sig_offset = data.find(b'ASPack')
- if sig_offset == -1:
- return data
- # Extract compressed data
- compressed = data[sig_offset + 256:] # Skip ASPack header
- # Custom ASPack decompression
- result = bytearray()
- i = 0
- while i < len(compressed):
- control = compressed[i]
- i += 1
- for bit in range(8):
- if control & (1 << bit):
- # Match copy
- info = int.from_bytes(compressed[i:i+2], 'little')
- i += 2
- length = ((info >> 12) & 0xf) + 3
- offset = info & 0xfff
- for j in range(length):
- result.append(result[-offset])
- else:
- # Literal byte
- result.append(compressed[i])
- i += 1
- if i >= len(compressed):
- break
- return bytes(result)
- except Exception as e:
- self.log_unpacking_error("ASPack", data[:32].hex(), str(e))
- return data
- @staticmethod
- def detect_packer(cls, data: bytes) -> str:
- """Detect common packer signatures in binary data"""
- packer_signatures = {
- 'UPX': b'UPX!',
- 'ASPack': b'ASPack',
- 'PECompact': b'PEC2',
- 'FSG': b'FSG!',
- 'MPRESS': b'MPRESS',
- 'MEW': b'MEW',
- 'Themida': b'Themida',
- 'VMProtect': b'VMProtect'
- }
- for packer_name, signature in packer_signatures.items():
- if signature in data:
- return packer_name
- return None
- def unpack_binary(self, data: bytes, packer_type: str) -> bytes:
- unpack_methods = {
- 'UPX': self._unpack_upx,
- 'ASPack': self._unpack_aspack,
- 'PECompact': self._unpack_pecompact,
- }
- if packer_type in unpack_methods:
- return unpack_methods[packer_type](data)
- return data
- def create_hex_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Hex View")
- # Increase display size
- self.hex_display = ScrolledText(frame, wrap=tk.WORD, width=120, height=40)
- self.hex_display.pack(expand=True, fill='both')
- # Add search functionality
- search_frame = ttk.Frame(frame)
- search_frame.pack(fill='x')
- ttk.Label(search_frame, text="Search Hex:").pack(side='left')
- search_entry = ttk.Entry(search_frame)
- search_entry.pack(side='left', fill='x', expand=True)
- # Add offset column and ASCII representation
- def format_hex_line(offset, data):
- hex_part = ' '.join(f'{b:02x}' for b in data)
- ascii_part = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in data)
- return f'{offset:08x} {hex_part:<48} |{ascii_part}|'
- return frame
- def create_parser_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Parser View")
- self.parser_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.parser_display.pack(expand=True, fill='both')
- return frame
- def create_shellcode_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Shellcode")
- # Create ScrolledText widget with specific name
- self.shellcode_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.shellcode_display.pack(expand=True, fill='both')
- return frame
- def create_exploit_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Exploit Analysis")
- self.exploit_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.exploit_display.pack(expand=True, fill='both')
- return frame
- # Similar methods for memory_view, network_view, and disasm_view
- def create_memory_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Memory Analysis")
- # Store the ScrolledText widget directly
- self.memory_text = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.memory_text.pack(expand=True, fill='both')
- return frame
- def create_network_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Network Analysis")
- self.network_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.network_display.pack(expand=True, fill='both')
- return frame
- def create_disasm_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Disassembly")
- # Store the ScrolledText widget directly
- self.disasm_text = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.disasm_text.pack(expand=True, fill='both')
- return frame
- def create_analysis_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Analysis")
- # Analysis results area
- self.analysis_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.analysis_display.pack(expand=True, fill='both')
- return frame
- def create_string_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Strings")
- # Strings display area
- self.string_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.string_display.pack(expand=True, fill='both')
- return frame
- def create_stats_view(self):
- frame = ttk.Frame(self.notebook)
- self.notebook.add(frame, text="Statistics")
- # Statistics display area
- self.stats_display = ScrolledText(frame, wrap=tk.WORD, width=80, height=30)
- self.stats_display.pack(expand=True, fill='both')
- return frame
- def open_file(self, device: BlockDevice, device_path):
- filename = filedialog.askopenfilename(
- title="Select File for Analysis",
- filetypes=[
- ("All Files", "*.*"),
- ("Binary Files", "*.bin"),
- ("Executable Files", "*.exe")
- ]
- )
- if filename:
- try:
- device = self.device_manager.init_device(filename) # Pass the actual file path
- self.update_displays(device)
- except Exception as e:
- print(f"Error stack: {traceback.format_exc()}")
- messagebox.showerror("Error", f"Failed to open file: {str(e)}")
- def save_file(self):
- filename = filedialog.asksaveasfilename(
- defaultextension=".bin",
- filetypes=[
- ("Binary files", "*.bin"),
- ("All files", "*.*")
- ]
- )
- if filename:
- try:
- # Get current data with validation
- if not self.device or not self.device[0].name:
- sector_data = b'\x00' * 512 # Default sector if no data
- else:
- sector_data = self.device_manager.read_sector(
- self.device[0].name,
- 0,
- self.device[0]
- ) or b'\x00' * 512 # Fallback if read fails
- # Write data with size check
- with open(filename, 'wb') as f:
- f.write(sector_data)
- # Update device with validated values
- self.device[0] = BlockDevice(
- name=filename,
- size=max(len(sector_data), 1), # Prevent zero size
- sector_size=max(self.device[0].sector_size, 512) # Ensure valid sector size
- )
- self.update_displays(self.device[0])
- messagebox.showinfo("Success", "File saved successfully!")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to save file: {str(e)}")
- def new_file(self):
- filename = filedialog.asksaveasfilename()
- if filename:
- try:
- with open(filename, 'wb') as f:
- f.write(b'\x00' * 512) # Create empty sector
- device = self.device_manager.init_device(filename)
- self.update_displays(filename, device)
- except Exception as e:
- messagebox.showerror("Error", f"Failed to create file: {str(e)}")
- def get_current_selection(self):
- """Get selected text from current active tab"""
- current_tab = self.notebook.select()
- widget = self.notebook.children[current_tab.split('.')[-1]]
- for child in widget.children.values():
- if isinstance(child, ScrolledText):
- return child.get("sel.first", "sel.last")
- return ""
- def insert_at_cursor(self, text):
- """Insert text at cursor position in current active tab"""
- current_tab = self.notebook.select()
- widget = self.notebook.children[current_tab.split('.')[-1]]
- for child in widget.children.values():
- if isinstance(child, ScrolledText):
- child.insert("insert", text)
- def copy_selection(self):
- """Copy selected text to clipboard"""
- selected_text = self.get_current_selection()
- self.root.clipboard_clear()
- self.root.clipboard_append(selected_text)
- def paste_selection(self):
- """Paste clipboard content at cursor position"""
- text = self.root.clipboard_get()
- self.insert_at_cursor(text)
- def update_displays(self, device: BlockDevice):
- sector_data = self.device_manager.read_sector(device.name, 0, device)
- analysis_results = self.device_manager.analyze_sector_content(sector_data)
- # Build rich parser data
- parser_info = {
- 'filename': device.name,
- 'filesize': device.size,
- 'sector_size': device.sector_size,
- 'sector_count': device.size // device.sector_size if device.sector_size else 0,
- 'analysis_mode': 'binary',
- 'verbose': True
- }
- self.rebuilt_shellcode = self.disasm_engine.rebuild_shellcode(
- sector_data=sector_data,
- exploit_analyzer=self.exploit_analyzer,
- memory_inspector=self.memory_inspector
- )
- if hasattr(self, 'parser_display'):
- self.parser_display.delete(1.0, tk.END)
- self.parser_display.insert(tk.END, "Parser Analysis Results:\n")
- for key, value in parser_info.items():
- self.parser_display.insert(tk.END, f"{key}: {value}\n")
- if hasattr(self, 'shellcode_text'):
- decoded = self.decode_shellcode(self.rebuilt_shellcode)
- self.shellcode_display.insert(tk.END, "\nDecoded Formats:\n")
- for format_name, decoded_text in decoded.items():
- self.shellcode_display.insert(tk.END, f"\n{format_name.upper()}:\n{decoded_text}")
- formatted_shellcode = self.format_shellcode_display(sector_data)
- self.execute_shellcode_generation()
- # Update shellcode view with proper widget reference
- if hasattr(self, 'shellcode_text'):
- self.shellcode_display.delete('1.0', tk.END)
- self.shellcode_display.insert(tk.END, formatted_shellcode)
- # Update memory view with the correct widget
- self.memory_text.delete(1.0, tk.END)
- self.memory_text.insert(tk.END, str(self.memory_inspector.analyze_memory_layout(sector_data)))
- # Get analysis results from the raw data
- self.exploit_analyzer = self.ExploitAnalyzer()
- self.memory_inspector = self.MemoryInspector()
- self.network_analyzer = self.NetworkPayloadAnalyzer()
- self.disasm_engine = self.DisassemblyEngine()
- self.strings = self.device_manager._extract_strings(sector_data)
- self.stats = self.device_manager._get_byte_distribution(sector_data)
- # Update all displays with fresh data
- self.hex_display.delete(1.0, tk.END)
- self.hex_display.insert(tk.END, self.format_hex_view(sector_data))
- self.analysis_display.delete(1.0, tk.END)
- self.analysis_display.insert(tk.END, str(analysis_results))
- # Update specialized analysis views
- self.exploit_display.delete(1.0, tk.END)
- self.exploit_display.insert(tk.END, str(self.exploit_analyzer.analyze_payload(sector_data)))
- self.network_display.delete(1.0, tk.END)
- self.network_display.insert(tk.END, str(self.network_analyzer.analyze_network_data(sector_data)))
- # Update disassembly view with the correct widget
- self.disasm_text.delete(1.0, tk.END)
- self.disasm_text.insert(tk.END, str(self.disasm_engine.disassemble_section(sector_data)))
- self.string_display.delete(1.0, tk.END)
- self.string_display.insert(tk.END, '\n'.join(self.strings))
- self.stats_display.delete(1.0, tk.END)
- self.stats_display.insert(tk.END, str(self.stats))
- self.shellcode_display.delete(1.0, tk.END)
- self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
- self.shellcode_display.insert(tk.END, "\n".join(self.convert_to_commands(self.rebuilt_shellcode)))
- self.shellcode_display.insert(tk.END, f"Rebuilt Shellcode:\n{self.rebuilt_shellcode.hex()}")
- # Update shellcode display with correct widget name
- self.shellcode_display.delete('1.0', tk.END)
- self.shellcode_display.insert('1.0', str(self.format_shellcode_display(sector_data)))
- def update_device_info(self, device: BlockDevice):
- device_info = f"Device: {device.name}\nSize: {device.size}\nSector Size: {device.sector_size}"
- self.info_display.insert(tk.END, device_info)
- def analyze_sector(self, device: BlockDevice, sector_num: int):
- sector_data = self.device_manager.read_sector(
- device_path=device.name,
- sector_num=sector_num,
- device=device
- )
- self.update_displays(device)
- return sector_data
- def inspect_memory(self, device: BlockDevice):
- memory_layout = self.memory_inspector.analyze_memory_layout(device)
- self.memory_view.update(memory_layout)
- def update_shellcode_view(self):
- sector_data = self.device_manager.read_sector(BinaryViewerGUI.analyze_sector.sector_num)
- commands = self.convert_to_commands(self.rebuilt_shellcode(sector_data, self.exploit_analyzer, self.memory_inspector))
- self.shellcode_display.delete(1.0, tk.END)
- self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
- self.shellcode_display.insert(tk.END, "\n".join(commands))
- def get_device_path() -> str:
- """Get device path using file dialog"""
- device_path = filedialog.askopenfilename(
- title="Select File for Analysis",
- filetypes=[
- ("All Files", "*.*"),
- ("Binary Files", "*.bin"),
- ("Executable Files", "*.exe"),
- ("System Files", "*.sys")
- ]
- )
- return device_path if device_path else ""
- def get_sector_size(path):
- try:
- # For Windows
- if os.name == 'nt':
- import win32file
- return win32file.GetDiskFreeSpace(path)[1]
- # For Linux
- else:
- import fcntl
- import struct
- BLKSSZGET = 0x1268
- with open(path, 'rb') as fd:
- return struct.unpack('I', fcntl.ioctl(fd, BLKSSZGET, b'\x00\x00\x00\x00'))[0]
- except:
- return 512 # Default fallback
- def create_parser(self):
- parser = OptionParser()
- parser.add_option("-f", "--file", dest="filename",
- help="read data from FILENAME",
- default=self.device[0].name if self.device else None)
- parser.add_option("-v", "--verbose",
- action="store_true", dest="verbose",
- default=True)
- parser.add_option("-s", "--sector",
- type="int", dest="sector_num",
- help="sector number to analyze",
- default=0)
- parser.add_option("-a", "--analysis",
- choices=["hex", "strings", "disasm"],
- dest="analysis_type",
- help="type of analysis to perform",
- default="hex")
- return parser
- @staticmethod
- def format_hex_view(data):
- if isinstance(data, str):
- data = data.encode('utf-8')
- elif not isinstance(data, bytes):
- data = b'' if not data else bytes(str(data), 'utf-8')
- hex_dump = []
- for i in range(0, len(data), 16):
- chunk = data[i:i+16]
- hex_line = ' '.join(f'{b:02x}' for b in chunk)
- ascii_line = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
- hex_dump.append(f'{i:08x} {hex_line:<48} |{ascii_line}|')
- return '\n'.join(hex_dump)
- class DeviceManager:
- def __init__(self, root, device: BlockDevice, sector_num, device_path):
- self.root = root
- self.root.title("Spear of Telesto - Advanced Malware Analysis")
- self.devices = {}
- self.device = device
- self.sector_analysis = b''
- def init_device(self, device_path: str) -> BlockDevice:
- if not device_path:
- return BlockDevice(
- name="",
- size=0,
- sector_size=512
- )
- actual_size = os.path.getsize(device_path)
- sector_size = BinaryViewerGUI.get_sector_size(device_path)
- device = BlockDevice(
- name=device_path,
- size=actual_size,
- sector_size=sector_size
- )
- self.devices[device_path] = device
- return device
- def read_sector(self, device_path: str, sector_num: int, device: BlockDevice) -> bytes:
- # Return empty bytes for initial state when no file is selected
- if not device_path:
- return b''
- # Normal reading logic for when a file exists
- with open(device_path, 'rb') as f:
- position = sector_num * device.sector_size
- f.seek(position)
- data = f.read(device.sector_size)
- return data
- def write_sector(self, device_path: str, sector_num: int, data: bytes) -> bool:
- device = self.device.get(device_path)
- print(f"Writing sector from: {device_path}")
- if not device:
- raise ValueError("Device not initialized")
- if len(data) != device.sector_size:
- raise ValueError(f"Data must be exactly {device.sector_size} bytes")
- with open(device_path, 'rb') as f:
- f.seek(sector_num * device.sector_size)
- f.write(data)
- return True
- def analyze_sector_content(self, data: bytes) -> Dict:
- """Analyze binary content of a sector"""
- # Handle empty data case with valid defaults
- if not data:
- return {
- 'entropy': 0.0,
- 'byte_distribution': {
- 'histogram': {},
- 'null_byte_ratio': 0.0,
- 'printable_ratio': 0.0
- },
- 'string_patterns': [],
- 'executable_signs': {
- 'has_x86_opcodes': {},
- 'has_function_prologue': False,
- 'has_syscall_patterns': {},
- 'has_jump_tables': {}
- },
- 'file_signatures': []
- }
- # Proceed with analysis for valid data
- return {
- 'entropy': self._calculate_entropy(data),
- 'byte_distribution': self._get_byte_distribution(data),
- 'string_patterns': self._extract_strings(data),
- 'executable_signs': self._detect_executable_content(data),
- 'file_signatures': self._identify_file_signatures(data)
- }
- def _calculate_entropy(self, data: bytes) -> float:
- if not data:
- return 0.0
- byte_counts = Counter(data)
- entropy = 0.0
- data_len = len(data)
- for count in byte_counts.values():
- probability = count / data_len
- entropy -= probability * math.log2(probability)
- return entropy
- def _get_byte_distribution(self, data: bytes) -> Dict:
- """Analyze byte frequency distribution"""
- if not data:
- return {
- 'histogram': {},
- 'null_byte_ratio': 0.0,
- 'printable_ratio': 0.0
- }
- distribution = Counter(data)
- return {
- 'histogram': dict(distribution),
- 'null_byte_ratio': distribution[0] / len(data),
- 'printable_ratio': sum(c in string.printable.encode() for c in data) / len(data)
- }
- def _extract_strings(self, data: bytes, min_length: int = 4) -> List[str]:
- """Extract readable strings from binary data"""
- strings = []
- current = []
- for byte in data:
- if 32 <= byte <= 126:
- current.append(chr(byte))
- elif current:
- if len(current) >= min_length:
- strings.append(''.join(current))
- current = []
- return strings
- def _detect_executable_content(self, data: bytes) -> Dict:
- """Identify potential executable code patterns"""
- return {
- 'has_x86_opcodes': self._check_x86_signatures(data),
- 'has_function_prologue': b'\x55\x89\xe5' in data, # Common x86 function prologue
- 'has_syscall_patterns': self._identify_syscalls(data),
- 'has_jump_tables': self._find_jump_patterns(data)
- }
- def _check_x86_signatures(self, data: bytes) -> Dict[str, List[int]]:
- """Detect common x86 instruction patterns"""
- signatures = {
- 'mov_reg': b'\x89', # MOV register operations
- 'push_reg': b'\x50', # PUSH register
- 'pop_reg': b'\x58', # POP register
- 'call_near': b'\xe8', # CALL near
- 'jmp_short': b'\xeb', # JMP short
- 'ret': b'\xc3', # RET
- 'int3': b'\xcc' # INT3 breakpoint
- }
- found = {}
- for name, pattern in signatures.items():
- offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
- if offsets:
- found[name] = offsets
- return found
- def _identify_syscalls(self, data: bytes) -> Dict[str, List[int]]:
- """Identify system call patterns"""
- syscall_patterns = {
- 'syscall': b'\x0f\x05', # syscall instruction
- 'int_80h': b'\xcd\x80', # int 0x80
- 'sysenter': b'\x0f\x34', # sysenter
- 'wow64': b'\xff\x15', # call dword ptr [xxx] (WoW64)
- 'arm_svc': b'\x01\xdf' # SVC #1 (ARM)
- }
- found_syscalls = {}
- for name, pattern in syscall_patterns.items():
- offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
- if offsets:
- found_syscalls[name] = offsets
- return found_syscalls
- def _find_jump_patterns(self, data: bytes) -> Dict[str, List[Dict]]:
- """Analyze jump table patterns and control flow structures"""
- jump_types = {
- 'jmp_direct': b'\xe9', # JMP direct
- 'jmp_short': b'\xeb', # JMP short
- 'jcc_near': b'\x0f\x80', # Jcc near (conditional jumps)
- 'call_direct': b'\xe8', # CALL direct
- 'switch_jump': b'\xff\x24' # JMP [reg*4+table]
- }
- jump_tables = {}
- for jtype, pattern in jump_types.items():
- jumps = []
- for i in range(len(data)-1):
- if data.startswith(pattern, i):
- target = None
- if len(data) >= i + 5: # Direct jumps are 5 bytes
- target = int.from_bytes(data[i+1:i+5], byteorder='little', signed=True)
- jumps.append({
- 'offset': i,
- 'target': target,
- 'bytes': data[i:i+5]
- })
- if jumps:
- jump_tables[jtype] = jumps
- return jump_tables
- def _identify_file_signatures(self, data: bytes) -> List[str]:
- """Detect common file signatures/magic numbers"""
- signatures = {
- b'MZ': 'DOS/PE Executable',
- b'ELF': 'Linux Executable',
- b'\x89PNG': 'PNG Image',
- b'PK': 'ZIP Archive',
- b'%PDF': 'PDF Document'
- }
- found_signatures = []
- for sig, file_type in signatures.items():
- if data.startswith(sig):
- found_signatures.append(file_type)
- return found_signatures
- class ExploitAnalyzer:
- def __init__(self):
- self.known_signatures = self.load_exploit_signatures()
- def analyze_payload(self, data: bytes) -> Dict:
- return {
- 'shellcode_patterns': self._detect_shellcode(data),
- 'rop_gadgets': self._find_rop_chains(data),
- 'exploit_signatures': self._match_known_exploits(data),
- 'nop_sleds': self._detect_nop_sleds(data)
- }
- def load_exploit_signatures(self) -> Dict[str, bytes]:
- """Load known exploit signatures and patterns"""
- return {
- 'buffer_overflow': b'\x41' * 20, # Repeated 'A' pattern
- 'format_string': b'%x' * 4, # Format string pattern
- 'heap_spray': b'\x90' * 100, # NOP sled pattern
- 'ret2libc': b'\x00\x00\x00\x00', # Null address pattern
- 'stack_pivot': b'\x94\xc3', # XCHG EAX,ESP + RET
- 'egg_hunter': b'\xaf\xae\xaf\xae' # Egg hunter signature
- }
- def _match_known_exploits(self, data: bytes) -> List[Dict]:
- """Match binary data against known exploit patterns"""
- matches = []
- for name, signature in self.known_signatures.items():
- offsets = [i for i in range(len(data)) if data.startswith(signature, i)]
- if offsets:
- matches.append({
- 'type': name,
- 'offsets': offsets,
- 'signature': signature.hex(),
- 'confidence': self._calculate_match_confidence(data, signature)
- })
- return matches
- def _calculate_match_confidence(self, data: bytes, signature: bytes) -> float:
- """Calculate confidence score for exploit signature matches"""
- # Base confidence factors
- factors = {
- 'exact_match': 1.0,
- 'partial_match': 0.7,
- 'context_match': 0.5,
- 'entropy_match': 0.3
- }
- confidence = 0.0
- # Check for exact signature match
- if signature in data:
- confidence += factors['exact_match']
- # Check surrounding context
- context_size = 16
- for i in range(len(data) - len(signature)):
- if data[i:i+len(signature)] == signature:
- pre_context = data[max(0, i-context_size):i]
- post_context = data[i+len(signature):i+len(signature)+context_size]
- if self._validate_context(pre_context, post_context):
- confidence += factors['context_match']
- # Check entropy similarity
- data_entropy = self._calculate_entropy(data)
- sig_entropy = self._calculate_entropy(signature)
- if abs(data_entropy - sig_entropy) < 0.1:
- confidence += factors['entropy_match']
- # Normalize confidence score
- return min(confidence, 1.0)
- def _calculate_entropy(self, data: bytes) -> float:
- """Calculate Shannon entropy to detect encryption/compression"""
- if not data:
- return 0.0
- byte_counts = Counter(data)
- entropy = 0.0
- data_len = len(data)
- for count in byte_counts.values():
- probability = count / data_len
- entropy -= probability * math.log2(probability)
- return entropy
- def _validate_context(self, pre_context: bytes, post_context: bytes) -> bool:
- """Validate the context around a signature match"""
- # Check for common exploit context patterns
- valid_patterns = [
- b'\x00', # Null bytes
- b'\x90', # NOPs
- b'\xcc', # INT3
- b'\x41' # Pattern bytes
- ]
- return any(pattern in pre_context or pattern in post_context
- for pattern in valid_patterns)
- def _detect_nop_sleds(self, data: bytes) -> List[Dict]:
- """Detect NOP sleds and similar padding patterns"""
- nop_patterns = {
- 'x86_nop': b'\x90', # Traditional NOP
- 'x86_64_nop': b'\x66\x90', # 2-byte NOP
- 'arm_nop': b'\x00\xf0\x20\xe3', # ARM NOP
- 'multi_byte_slide': b'\x41\x41' # Multi-byte slide pattern
- }
- sleds = []
- min_sled_length = 16
- for name, pattern in nop_patterns.items():
- offset = 0
- while offset < len(data):
- count = 0
- start = offset
- while offset < len(data) and data.startswith(pattern, offset):
- count += len(pattern)
- offset += len(pattern)
- if count >= min_sled_length:
- sleds.append({
- 'type': name,
- 'offset': start,
- 'length': count,
- 'pattern': pattern.hex()
- })
- offset += 1
- return sleds
- def _detect_shellcode(self, data: bytes) -> List[Dict]:
- shellcode_patterns = []
- # Common shellcode patterns
- patterns = {
- 'syscall_exec': b'\x0f\x05', # syscall instruction
- 'stack_pivot': b'\x94', # XCHG EAX, ESP
- 'get_eip': b'\xe8', # CALL instruction
- }
- for name, pattern in patterns.items():
- offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
- if offsets:
- shellcode_patterns.append({
- 'type': name,
- 'offsets': offsets,
- 'size': len(pattern)
- })
- return shellcode_patterns
- def _find_rop_chains(self, data: bytes) -> List[Dict]:
- rop_gadgets = []
- # Common ROP gadget endings
- endings = [b'\xc3', b'\xcb', b'\xc2'] # RET, RETF, RET imm16
- for i in range(len(data)-1):
- for ending in endings:
- if data[i:i+len(ending)] == ending:
- gadget = data[i-12:i+len(ending)] # Look at previous 12 bytes
- rop_gadgets.append({
- 'offset': i-12,
- 'bytes': gadget,
- 'size': len(gadget)
- })
- return rop_gadgets
- class MemoryInspector:
- def analyze_memory_layout(self, data: bytes) -> Dict:
- return {
- 'stack_frames': self._identify_stack_frames(data),
- 'heap_chunks': self._analyze_heap_structures(data),
- 'vtables': self._find_vtables(data),
- 'function_pointers': self._detect_function_ptrs(data)
- }
- def _analyze_heap_structures(self, data: bytes) -> List[Dict]:
- """Analyze heap chunk patterns and metadata"""
- heap_chunks = []
- # Common heap chunk headers (size field + metadata)
- chunk_patterns = {
- 'glibc': b'\x00\x00\x00\x00', # Size field
- 'windows': b'\x00\x00\x00\x08', # Header size
- 'freelist': b'\x00\x00\x00\x01' # Free chunk
- }
- for i in range(0, len(data) - 8, 8):
- header = data[i:i+8]
- for heap_type, pattern in chunk_patterns.items():
- if pattern in header:
- size = int.from_bytes(header[:4], byteorder='little')
- heap_chunks.append({
- 'offset': i,
- 'type': heap_type,
- 'size': size,
- 'metadata': header.hex()
- })
- return heap_chunks
- def _find_vtables(self, data: bytes) -> List[Dict]:
- """Identify potential virtual table structures"""
- vtables = []
- # Look for aligned pointer sequences
- pointer_size = 8 # 64-bit pointers
- for i in range(0, len(data) - pointer_size * 4, pointer_size):
- pointers = []
- for j in range(4): # Check sequence of 4 pointers
- ptr = int.from_bytes(data[i + j*pointer_size:i + (j+1)*pointer_size],
- byteorder='little')
- if ptr > 0x400000: # Basic address sanity check
- pointers.append(ptr)
- if len(pointers) >= 3: # Minimum vtable size
- vtables.append({
- 'offset': i,
- 'pointers': pointers,
- 'count': len(pointers)
- })
- return vtables
- def _detect_function_ptrs(self, data: bytes) -> List[Dict]:
- """Detect potential function pointers"""
- function_ptrs = []
- # Common function prologue patterns
- prologues = {
- 'standard': b'\x55\x48\x89\xe5', # push rbp; mov rbp, rsp
- 'optimized': b'\x53\x48\x83\xec', # push rbx; sub rsp, X
- 'syscall': b'\x0f\x05\xc3' # syscall; ret
- }
- for i in range(0, len(data) - 8, 8):
- ptr = int.from_bytes(data[i:i+8], byteorder='little')
- if 0x400000 <= ptr <= 0x7fffffffffff: # Valid address range
- # Check if pointer points to valid code
- for name, pattern in prologues.items():
- if i + len(pattern) < len(data) and pattern in data[i:i+len(pattern)]:
- function_ptrs.append({
- 'offset': i,
- 'address': hex(ptr),
- 'type': name,
- 'context': data[i:i+16].hex()
- })
- return function_ptrs
- def _identify_stack_frames(self, data: bytes) -> List[Dict]:
- frames = []
- # Look for common stack frame patterns
- frame_patterns = [
- b'\x55\x48\x89\xe5', # push rbp; mov rbp, rsp
- b'\x55\x89\xe5' # push ebp; mov ebp, esp
- ]
- for pattern in frame_patterns:
- offsets = [i for i in range(len(data)) if data.startswith(pattern, i)]
- for offset in offsets:
- frames.append({
- 'offset': offset,
- 'type': 'frame_setup',
- 'size': self._calculate_frame_size(data[offset:])
- })
- return frames
- def _calculate_frame_size(self, data: bytes) -> int:
- """Calculate stack frame size from prologue/epilogue"""
- frame_size = 0
- # Look for stack adjustment instructions
- if b'\x48\x83\xec' in data: # sub rsp, X
- offset = data.index(b'\x48\x83\xec')
- frame_size = data[offset + 3] # Size byte
- elif b'\x48\x81\xec' in data: # sub rsp, XXX
- offset = data.index(b'\x48\x81\xec')
- frame_size = int.from_bytes(data[offset+3:offset+7], byteorder='little')
- return frame_size
- class NetworkPayloadAnalyzer:
- def analyze_network_data(self, data: bytes) -> Dict:
- return {
- 'protocol_signatures': self._identify_protocols(data),
- 'payload_structure': self._analyze_payload_structure(data),
- 'embedded_commands': self._find_command_patterns(data),
- 'encoding_type': self._detect_encoding(data)
- }
- def _analyze_payload_structure(self, data: bytes) -> Dict:
- """Analyze network payload structure and patterns"""
- structure = {
- 'header_size': 0,
- 'payload_type': 'unknown',
- 'segments': [],
- 'boundaries': []
- }
- # Common protocol boundaries
- delimiters = [b'\r\n', b'\n\n', b'\x00\x00', b'--']
- # Identify segments
- current_pos = 0
- for delimiter in delimiters:
- pos = data.find(delimiter)
- while pos != -1:
- structure['segments'].append({
- 'start': current_pos,
- 'end': pos,
- 'size': pos - current_pos,
- 'type': self._identify_segment_type(data[current_pos:pos])
- })
- current_pos = pos + len(delimiter)
- structure['boundaries'].append(pos)
- pos = data.find(delimiter, current_pos)
- return structure
- def _identify_segment_type(self, data: bytes) -> str:
- """Identify the type of data segment based on content patterns"""
- # Protocol headers
- if data.startswith(b'HTTP/'):
- return 'http_header'
- if b'Content-Type:' in data:
- return 'mime_header'
- if data.startswith(b'GET') or data.startswith(b'POST'):
- return 'http_request'
- # Data formats
- if data.startswith(b'{') and data.endswith(b'}'):
- return 'json_data'
- if data.startswith(b'<?xml'):
- return 'xml_data'
- if b'%PDF' in data:
- return 'pdf_content'
- # Binary patterns
- if any(x < 32 and x != 9 and x != 10 and x != 13 for x in data):
- if self._calculate_entropy(data) > 7.0:
- return 'encrypted_data'
- return 'binary_data'
- # Text patterns
- if all(32 <= x <= 126 or x in (9, 10, 13) for x in data):
- return 'text_data'
- return 'unknown'
- def _calculate_entropy(self, data: bytes) -> float:
- """Calculate Shannon entropy to detect encryption/compression"""
- byte_counts = Counter(data)
- entropy = 0
- for count in byte_counts.values():
- probability = count / len(data)
- entropy -= probability * math.log2(probability)
- return entropy
- def _find_command_patterns(self, data: bytes) -> List[Dict]:
- """Identify command patterns in network data"""
- commands = []
- # Common command patterns
- patterns = {
- 'shell_cmd': br'(sh|bash|cmd|powershell).*?[\r\n]',
- 'sql_query': br'(SELECT|INSERT|UPDATE|DELETE).*?;',
- 'http_method': br'(GET|POST|PUT|DELETE) /.*?HTTP',
- 'file_ops': br'(open|read|write|close|exec).*?\(',
- }
- for cmd_type, pattern in patterns.items():
- matches = re.finditer(pattern, data)
- for match in matches:
- commands.append({
- 'type': cmd_type,
- 'offset': match.start(),
- 'command': match.group(),
- 'context': data[max(0, match.start()-10):match.end()+10]
- })
- return commands
- def _detect_encoding(self, data: bytes) -> Dict[str, float]:
- """Detect potential data encodings"""
- encodings = {
- 'base64': 0.0,
- 'hex': 0.0,
- 'url': 0.0,
- 'ascii': 0.0,
- 'utf8': 0.0
- }
- # Base64 check
- b64_chars = set(b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=')
- encodings['base64'] = len([x for x in data if x in b64_chars]) / len(data)
- # Hex check
- hex_chars = set(b'0123456789abcdefABCDEF')
- encodings['hex'] = len([x for x in data if x in hex_chars]) / len(data)
- # URL encoding check
- url_chars = set(b'%0123456789abcdefABCDEF')
- encodings['url'] = len([x for x in data if x in url_chars]) / len(data)
- # ASCII printable check
- encodings['ascii'] = len([x for x in data if 32 <= x <= 126]) / len(data)
- # UTF-8 check
- try:
- data.decode('utf-8')
- encodings['utf8'] = 1.0
- except UnicodeDecodeError:
- pass
- return encodings
- def _identify_protocols(self, data: bytes) -> List[str]:
- protocols = []
- signatures = {
- 'HTTP': b'HTTP/',
- 'FTP': b'220 ',
- 'SSH': b'SSH-',
- 'SMB': b'\xffSMB',
- }
- for proto, sig in signatures.items():
- if sig in data:
- protocols.append(proto)
- return protocols
- class DisassemblyEngine:
- def disassemble_section(self, data: bytes, arch='x86') -> List[Dict]:
- instructions = []
- # Basic x86 instruction patterns
- patterns = {
- 'mov': b'\x89',
- 'push': b'\x50',
- 'pop': b'\x58',
- 'call': b'\xe8',
- 'jmp': b'\xeb',
- }
- offset = 0
- while offset < len(data):
- for name, opcode in patterns.items():
- if data[offset:].startswith(opcode):
- instructions.append({
- 'offset': offset,
- 'type': name,
- 'bytes': data[offset:offset+len(opcode)],
- 'size': len(opcode)
- })
- offset += len(opcode)
- break
- offset += 1
- return instructions
- def rebuild_shellcode(self, sector_data, exploit_analyzer, memory_inspector) -> bytes:
- # Extract patterns and layout
- shellcode_patterns = exploit_analyzer._detect_shellcode(sector_data)
- memory_layout = memory_inspector.analyze_memory_layout(sector_data)
- rop_chains = exploit_analyzer._find_rop_chains(sector_data)
- # Get critical memory structures
- function_ptrs = memory_layout['function_pointers']
- vtables = memory_layout['vtables']
- # Rebuild shellcode sections
- rebuilt_code = bytearray()
- # Add function pointers for dynamic calls
- for ptr in function_ptrs:
- rebuilt_code.extend(int.to_bytes(ptr['address'], 8, 'little'))
- # Add vtable entries for object-oriented payloads
- for vtable in vtables:
- for ptr in vtable['pointers']:
- rebuilt_code.extend(int.to_bytes(ptr, 8, 'little'))
- # Add shellcode patterns
- for pattern in shellcode_patterns:
- if pattern['type'] in ['syscall_exec', 'stack_pivot', 'get_eip']:
- offset = pattern['offsets'][0]
- size = pattern['size']
- rebuilt_code.extend(sector_data[offset:offset+size])
- # Add ROP chains
- for gadget in rop_chains:
- rebuilt_code.extend(gadget['bytes'])
- return bytes(rebuilt_code)
- def convert_to_commands(self, shellcode: bytes) -> List[str]:
- commands = []
- # Convert shellcode to hex commands
- hex_commands = [
- f'echo {shellcode[i:i+32].hex()} >> shellcode.hex'
- for i in range(0, len(shellcode), 32)
- ]
- # Add command generation steps
- commands.extend([
- 'powershell -Command "$hex = Get-Content shellcode.hex"',
- 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
- 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
- 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
- ])
- return hex_commands + commands
- def process_shellcode_chunks(self, shellcode: str, chunk_size: int = 32):
- # Split shellcode into manageable chunks
- commands = []
- # Initial command to create new file
- commands.append('echo off > shellcode.hex')
- # Process shellcode in chunks
- for i in range(0, len(shellcode), chunk_size):
- chunk = shellcode[i:i+chunk_size]
- commands.append(f'echo {chunk} >> shellcode.hex')
- # Add PowerShell conversion commands
- commands.extend([
- 'powershell -Command "$hex = Get-Content shellcode.hex"',
- 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
- 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
- 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
- ])
- return commands
- def execute_shellcode_generation(self):
- # Get the shellcode from your analysis
- shellcode = self.rebuilt_shellcode.hex()
- # Generate command sequence
- commands = self.process_shellcode_chunks(shellcode)
- # Display in shellcode view with correct widget name
- self.shellcode_display.delete(1.0, tk.END)
- self.shellcode_display.insert(tk.END, "Command Line Instructions:\n")
- self.shellcode_display.insert(tk.END, "\n".join(commands))
- def decode_shellcode(self, shellcode: bytes) -> Dict[str, str]:
- decoded_formats = {
- 'utf16le': '',
- 'utf8': '',
- 'ascii': '',
- 'hex': '',
- 'binary': ''
- }
- try:
- # UTF-16LE decode
- if shellcode.startswith(b'\xff\xfe'):
- decoded_formats['utf16le'] = shellcode.decode('utf-16le')
- # UTF-8 decode
- decoded_formats['utf8'] = shellcode.decode('utf-8', errors='replace')
- # ASCII decode
- decoded_formats['ascii'] = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in shellcode)
- # Hex representation
- decoded_formats['hex'] = shellcode.hex()
- # Binary representation
- decoded_formats['binary'] = ' '.join(f'{b:08b}' for b in shellcode)
- except Exception as e:
- decoded_formats['error'] = f"Decoding error: {str(e)}"
- return decoded_formats
- def format_shellcode_display(self, sector_data: bytes) -> str:
- # Create formatted sections
- sections = []
- # Add command instructions header
- sections.append("Command Line Instructions:")
- # Format hex chunks with echo commands
- hex_data = sector_data.hex()
- for i in range(0, len(hex_data), 64):
- chunk = hex_data[i:i+64]
- sections.append(f"echo {chunk} >> shellcode.hex")
- # Add PowerShell conversion commands
- sections.extend([
- 'powershell -Command "$hex = Get-Content shellcode.hex"',
- 'powershell -Command "$bytes = [byte[]]::new($hex.Length/2)"',
- 'powershell -Command "for($i=0; $i -lt $hex.Length; $i+=2){$bytes[$i/2] = [convert]::ToByte($hex.Substring($i,2),16)}"',
- 'powershell -Command "$bytes | Set-Content shellcode.bin -Encoding Byte"'
- ])
- # Add decoded sections
- decoded = self.decode_shellcode(sector_data)
- sections.append("\nDecoded Content:")
- for format_name, content in decoded.items():
- sections.append(f"\n{format_name.upper()}:")
- sections.append(content)
- return "\n".join(sections)
- class Report(FPDF):
- def __init__(self):
- super().__init__()
- self.add_page()
- self.set_font('Arial', 'B', 16)
- self.cell(0, 10, 'Spear of Telesto Analysis Report', 0, 1, 'C')
- def add_section(self, title, content):
- self.set_font('Arial', 'B', 14)
- self.cell(0, 10, title, 0, 1)
- self.set_font('Arial', '', 12)
- self.multi_cell(0, 10, content)
- self.ln()
- def detect_packer(data: bytes):
- signatures = {
- 'UPX': b'UPX!',
- 'ASPack': b'ASPack',
- 'PECompact': b'PEC2',
- }
- for packer, sig in signatures.items():
- if sig in data:
- return packer
- return None
- def unpack_binary(self, data: bytes, packer_type: str):
- if packer_type == 'UPX':
- return self.upx_unpack(data)
- elif packer_type == 'ASPack':
- return self.aspack_unpack(data)
- return data
- def aspack_unpack(self,data: bytes) -> bytes:
- """ASPack unpacking implementation"""
- try:
- # Locate ASPack signature and header
- aspack_sig = b'ASPack'
- sig_offset = data.find(aspack_sig)
- if sig_offset != -1:
- # Extract packed section
- header_size = 512
- packed_data = data[sig_offset + header_size:]
- # Perform ASPack decompression
- result = bytearray()
- i = 0
- while i < len(packed_data):
- control = packed_data[i]
- i += 1
- for bit in range(8):
- if control & (1 << bit):
- # Match/Copy operation
- info = int.from_bytes(packed_data[i:i+2], 'little')
- i += 2
- length = ((info >> 12) & 0xf) + 3
- offset = info & 0xfff
- for j in range(length):
- result.append(result[-offset])
- else:
- # Literal byte copy
- result.append(packed_data[i])
- i += 1
- if i >= len(packed_data):
- break
- return bytes(result)
- return data
- except Exception as e:
- logging.error(f"ASPack unpacking error: {str(e)}")
- return data
- def upx_unpack(data: bytes) -> bytes:
- """UPX unpacking implementation"""
- try:
- # Check for UPX signature
- upx_sig = b'UPX!'
- if upx_sig in data:
- # Extract UPX sections
- sections = []
- offset = 0
- while offset < len(data):
- section_sig = data.find(upx_sig, offset)
- if section_sig == -1:
- break
- # Process UPX section
- section_start = section_sig + len(upx_sig)
- section_size = int.from_bytes(data[section_start:section_start+4], 'little')
- compressed = data[section_start+4:section_start+4+section_size]
- # Decompress section using LZMA
- try:
- import lzma
- decompressed = lzma.decompress(compressed)
- sections.append(decompressed)
- except:
- sections.append(compressed)
- offset = section_start + 4 + section_size
- # Combine unpacked sections
- if sections:
- return b''.join(sections)
- return data
- except Exception as e:
- logging.error(f"UPX unpacking error: {str(e)}")
- return data
- def detect_custom_obfuscation(self, data: bytes):
- # Entropy analysis for obfuscation detection
- entropy = self.calculate_entropy(data)
- return entropy > 7.0
- def remove_obfuscation(self, data: bytes):
- # Implement custom deobfuscation logic
- return self.deobfuscate_xor(data)
- def deobfuscate_xor(self, data: bytes, key=None):
- if not key:
- key = self.detect_xor_key(data)
- return bytes(b ^ key for b in data)
- def detect_xor_key(self, data: bytes):
- # Simple XOR key detection
- return max(set(data), key=data.count)
- def calculate_entropy(data: bytes):
- # Shannon entropy calculation
- freq = Counter(data)
- entropy = 0
- for count in freq.values():
- probability = count / len(data)
- entropy -= probability * math.log2(probability)
- return entropy
- def main():
- root = tk.Tk()
- root.title("Spear of Telesto - Advanced Malware Analysis")
- root.geometry("1200x800")
- # Configure detailed logging
- logging.basicConfig(
- level=logging.DEBUG,
- format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
- )
- # Initialize device with full analysis capabilities
- device = BlockDevice(
- name="",
- size=0, # Expanded size for full binary analysis
- sector_size=0 # Enhanced sector size for detailed scanning
- )
- # Set analysis parameters
- sector_num = 0
- device_path = ""
- # Launch analysis environment
- app = BinaryViewerGUI(root, device, sector_num, device_path)
- root.mainloop()
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment